2017-10-20 153 views
0

我正在關注https://blog.golang.org/pipelines文章以實施幾個階段。在golang中,如何編寫一個流水線階段,爲下一階段引入延遲?

我需要其中一個階段來引入流水線中下一階段傳遞事件之前幾秒的延遲。

我對下面的代碼的擔心是,它會產生無限數量的go.sout()方法,在傳遞事件之前。有沒有更好的方法來做到這一點?

謝謝!

func fooStage(inChan <- chan *Bar) (<- chan *Bar) { 
    out := make(chan *Bar, 10000) 
    go func() { 
     defer close(out) 
     wg := sync.WaitGroup{} 
     for { 
      select { 
      case event, ok := <-inChan: 
       if !ok { 
        // inChan closed 
        break 
       } 
       wg.Add(1) 
       go func() { 
        time.Sleep(5 * time.Second) 
        out <- event 
        wg.Done() 
       }() 
      } 
     } 
     wg.Wait() 
    }() 
    return out 
} 

回答

1

你可以使用其他通道,以限制主動夠程的數量你的循環能夠創建。

const numRoutines = 10 

func fooStage(inChan <-chan *Bar) <-chan *Bar { 
    out := make(chan *Bar, 10000) 
    routines := make(chan struct{}, numRoutines) 
    go func() { 
     defer close(out) 
     wg := sync.WaitGroup{} 
     for { 
      select { 
      case event, ok := <-inChan: 
       if !ok { 
        // inChan closed 
        break 
       } 
       wg.Add(1) 
       routines <- struct{}{} 
       go func() { 
        time.Sleep(5 * time.Second) 
        out <- event 
        wg.Done() 
        <-routines 
       }() 
      } 
     } 
     wg.Wait() 
    }() 
    return out 
} 
+0

謝謝,這似乎是一個好主意。我能看到的唯一缺點是,如果「例程」頻道被阻止,事件將被延遲超過5秒鐘。沒有好的方法來解決這個問題,但是我並沒有在事件中加入時間戳。 – ultimoo

+1

@ultimoo由於等待時間爲5秒,您可以輕鬆運行數百或數千個goroutines,這將減少實際的事件等待時間。這樣的事情很難通過閱讀代碼來確定。測試和基準測試將需要真正確定事情如何實際運行。 – RayfenWindspear

+0

當然,這更像是一個實驗。我的直覺是運行幾千個這樣的goroutines應該是可以的 - 因爲他們所做的只是運行'time.Sleep()',它們不會在處理器的大部分使用壽命中被安排。 – ultimoo

0

您可以使用time.Ticker

func fooStage(inChan <- chan *Bar) (<- chan *Bar) { 
    //... some code 
    ticker := time.NewTicker(5 * time.Second) 
    <-ticker // the delay, probably need to call twice 
    ticker.Stop() 
    close(ticker.C) 
    //... rest code 
} 
+0

你能解釋一下它如何與後續事件一起工作嗎?如果兩個事件通過inChan進入,第二個事件是否等待10秒? – ultimoo

+0

'<-ticker'應該比指定的時間段更少返回。去嘗試一下 –

1

您可以手動修復夠程的數量 - 僅開始編號需要。

func sleepStage(in <-chan *Bar) (out <-chan *Bar) { 
    out = make(<-chan *Bar) 
    wg := sync.WaitGroup 
    for i:=0; i < N; i++ { // Number of goroutines in parallel 
      wg.Add(1) 
      go func(){ 
       defer wg.Done() 
       for e := range in { 
        time.Sleep(5*time.Seconds) 
        out <- e 
       } 
      }() 
     } 
     go func(){} 
      wg.Wait() 
      close(out) 
     }() 
     return out 
    } 
相關問題