2013-10-04 67 views
4

注 - 新手在Go。一個通道多路複用器

我寫了一個多路複用器,應該合併通道數組的輸出爲一個。滿意的建設性批評。

func Mux(channels []chan big.Int) chan big.Int { 
    // Count down as each channel closes. When hits zero - close ch. 
    n := len(channels) 
    // The channel to output to. 
    ch := make(chan big.Int, n) 

    // Make one go per channel. 
    for _, c := range channels { 
     go func() { 
      // Pump it. 
      for x := range c { 
       ch <- x 
      } 
      // It closed. 
      n -= 1 
      // Close output if all closed now. 
      if n == 0 { 
       close(ch) 
      } 
     }() 
    } 
    return ch 
} 

我與測試它:

func fromTo(f, t int) chan big.Int { 
    ch := make(chan big.Int) 

    go func() { 
     for i := f; i < t; i++ { 
      fmt.Println("Feed:", i) 
      ch <- *big.NewInt(int64(i)) 
     } 
     close(ch) 
    }() 
    return ch 
} 

func testMux() { 
    r := make([]chan big.Int, 10) 
    for i := 0; i < 10; i++ { 
     r[i] = fromTo(i*10, i*10+10) 
    } 
    all := Mux(r) 
    // Roll them out. 
    for l := range all { 
     fmt.Println(l) 
    } 
} 

,但我的輸出是很奇怪:

Feed: 0 
Feed: 10 
Feed: 20 
Feed: 30 
Feed: 40 
Feed: 50 
Feed: 60 
Feed: 70 
Feed: 80 
Feed: 90 
Feed: 91 
Feed: 92 
Feed: 93 
Feed: 94 
Feed: 95 
Feed: 96 
Feed: 97 
Feed: 98 
Feed: 99 
{false [90]} 
{false [91]} 
{false [92]} 
{false [93]} 
{false [94]} 
{false [95]} 
{false [96]} 
{false [97]} 
{false [98]} 
{false [99]} 

所以我的問題:

  • 有什麼事我在MUX中做錯了嗎?
  • 爲什麼我只能從我的輸出通道獲取最後10個?
  • 爲什麼餵食看起來很奇怪? (每個輸入通道的第一個,所有最後一個通道,然後都沒有)
  • 有沒有更好的方法來做到這一點?

我所需要的所有輸入通道中的具有相等權利至輸出信道 - 即我不能有所有的輸出從一個通道,然後將所有來自下一等


對於任何感興趣 - 這是修復後的最終代碼和正確的(可能)使用的sync.WaitGroup

import (
    "math/big" 
    "sync" 
) 

/* 
    Multiplex a number of channels into one. 
*/ 
func Mux(channels []chan big.Int) chan big.Int { 
    // Count down as each channel closes. When hits zero - close ch. 
    var wg sync.WaitGroup 
    wg.Add(len(channels)) 
    // The channel to output to. 
    ch := make(chan big.Int, len(channels)) 

    // Make one go per channel. 
    for _, c := range channels { 
     go func(c <-chan big.Int) { 
      // Pump it. 
      for x := range c { 
       ch <- x 
      } 
      // It closed. 
      wg.Done() 
     }(c) 
    } 
    // Close the channel when the pumping is finished. 
    go func() { 
     // Wait for everyone to be done. 
     wg.Wait() 
     // Close. 
     close(ch) 
    }() 
    return ch 
} 

回答

2

您的每一個夠程從Mux催生結束了從同一渠道拉動,因爲c獲得更新循環的每個迭代–他們不只是捕獲的價值c

for _, c := range channels { 
    go func(c <-chan big.Int) { 
     ... 
    }(c) 
} 

您可以測試這個修改here:如果您通過通道向夠程,像這樣你會得到預期的結果。

另一個可能的問題是你對n變量的處理:如果你正在運行GOMAXPROCS != 1,你可能有兩個goroutines試圖立即更新它。 sync.WaitGroup類型是等待goroutine完成的更安全的方式。

+0

謝謝 - 這完全解釋了我的問題。結果會在任何體系結構中始終如一地享有所有渠道的平等權利? – OldCurmudgeon

+0

您是否問每個餵食'ch'的goroutine是否會公平計劃?我不知道這是否定義。如果您需要特定的交叉結果,則可能需要更多。 –

+0

我擔心在某些環境下,每個頻道可能會耗盡以便在下一次查看前消耗殆盡。這必須避免。我不需要特定的順序,但我需要在所有渠道之間保持公平的平衡。 – OldCurmudgeon

1

要使用range語句時被分配一個局部變量的基礎上James Hentridge答案,一個慣用的方式來處理重新assignement問題股份價值:

for _, c := range channels { 
    c := c 
    go func() { 
    ... 
    }() 
}