2014-01-07 52 views
0
func main() { 
    jobs := []Job{job1, job2, job3} 
    numOfJobs := len(jobs) 
    resultsChan := make(chan *Result, numOfJobs) 
    jobChan := make(chan *job, numOfJobs) 
    go consume(numOfJobs, jobChan, resultsChan) 
    for i := 0; i < numOfJobs; i++ { 
     jobChan <- jobs[i] 
    } 
    close(jobChan) 

    for i := 0; i < numOfJobs; i++ { 
     <-resultsChan 
    } 
    close(resultsChan) 
} 

func (b *Blockchain) consume(num int, jobChan chan *Job, resultsChan chan *Result) { 
    for i := 0; i < num; i++ { 
     go func() { 
      job := <-jobChan 
      resultsChan <- doJob(job) 
     }() 
    } 
} 

在上面的例子中,工作推入jobChan和夠程會拉斷jobChan兼執行工作,推動成果轉化resultsChan。然後我們將結果從resultChan中取出。Golang:生產者/消費者併發模型但系列化結果

問題1:

在我的代碼,沒有序列化/ linearilized結果。儘管工作按照job1,job2,job3的順序進行。結果可能會以job3,job1,job2的形式出現,具體取決於哪一個最長。

我仍然想要同時執行這些工作,但是,我需要確保結果以相同的順序出現在結果中,並作爲工作進行。

問題2:

我有大約30萬就業崗位,這意味着該代碼將產生高達300K夠程。這是有效的,有這麼多的goroutines,或者我會更好地在一起分組工作在一個100分左右,並讓每個goroutine通過100而不是1.

+0

這可能是重複的,請檢查以下鏈接:https://stackoverflow.com/questions/20744619/concurrent-producer-and-consumer-in-go/20745582#20745582 – ymg

+0

嗨Yasir,謝謝你。但是,我不認爲這是重複的。因爲你指出的問題似乎沒有解決線性化/序列化的結果。 – samol

回答

1

這是我處理序列化(和也設置了有限的工人)。我設置了一些帶有輸入和輸出字段以及同步通道的工作對象,然後我通過它們循環,完成他們所做的任何工作並給他們一份新工作。然後,我通過他們的最後一個通行證,拿起任何已完成的工作。請注意,您可能希望工作人員數量在某種程度上超過您的核心數量,這樣即使有一項異常長時間的工作,您也可以保持所有資源都處於忙碌狀態。代碼爲http://play.golang.org/p/PM9y4ieMxw及以下。

這是毛茸茸的(比我記得在坐下來寫一個例子之前更毛茸茸!) - 很想看看別人有什麼,不管是更好的實現還是完全不同的方式來實現你的目標。

package main 

import (
    "fmt" 
    "math/rand" 
    "runtime" 
    "time" 
) 

type Worker struct { 
    in  int 
    out int 
    inited bool 

    jobReady chan bool 
    done  chan bool 
} 

func (w *Worker) work() { 
    time.Sleep(time.Duration(rand.Float32() * float32(time.Second))) 
    w.out = w.in + 1000 
} 
func (w *Worker) listen() { 
    for <-w.jobReady { 
     w.work() 
     w.done <- true 
    } 
} 
func doSerialJobs(in chan int, out chan int) { 
    concurrency := 23 
    workers := make([]Worker, concurrency) 
    i := 0 
    // feed in and get out items 
    for workItem := range in { 
     w := &workers[i% 
      concurrency] 
     if w.inited { 
      <-w.done 
      out <- w.out 
     } else { 
      w.jobReady = make(chan bool) 
      w.done = make(chan bool) 
      w.inited = true 
      go w.listen() 
     } 
     w.in = workItem 
     w.jobReady <- true 
     i++ 
    } 
    // get out any job results left over after we ran out of input 
    for n := 0; n < concurrency; n++ { 
     w := &workers[i%concurrency] 
     if w.inited { 
      <-w.done 
      out <- w.out 
     } 
     close(w.jobReady) 
     i++ 
    } 
    close(out) 
} 
func main() { 
    runtime.GOMAXPROCS(10) 
    in, out := make(chan int), make(chan int) 
    allFinished := make(chan bool) 
    go doSerialJobs(in, out) 
    go func() { 
     for result := range out { 
      fmt.Println(result) 
     } 
     allFinished <- true 
    }() 
    for i := 0; i < 100; i++ { 
     in <- i 
    } 
    close(in) 
    <-allFinished 
} 

請注意,只有inout在這個例子中進行實際數據 - 所有其他渠道都只是進行同步。