2012-06-11 81 views
11

這個想法是在切片中有可變數量的通道,將通過它們接收的每個值推入單個通道,並且一旦最後一個輸入通道關閉,關閉該輸出通道。像這樣的事情,但對於一些渠道兩個以上的:是否可以將多個通道複用爲一個?

func multiplex(cin1, cin2, cout chan int) { 
    n := 2 
    for { 
     select { 
     case v, ok := <-cin1: 
      if ok { 
       cout <- v 
      } else { 
       n -= 1 
      } 

     case v, ok := <-cin2: 
      if ok { 
       cout <- v 
      } else { 
       n -= 1 
      } 
     } 

     if n == 0 { 
      close(cout) 
      break 
     } 
    } 
} 

上面的代碼避免忙循環,因爲沒有default情況下,這是個好(編輯:它看起來像的存在「確定」使得select語句無阻塞,並且循環完全是忙的,但是爲了舉例,將代碼視爲阻塞)。使用任意數量的輸入通道也可以實現同樣的功能?顯然,這可以通過將切片成對地縮小到單個通道來完成,但是如果可能的話,我會對更簡單的解決方案更感興趣。

回答

24

我相信這個片段做了你在找什麼。我已經改變了簽名,因此很明顯輸入和輸出只能用於一個方向上的通信。注意增加了一個sync.WaitGroup,你需要一些方式讓所有的輸入信號表明它們已經完成,這很容易。

func combine(inputs []<-chan int, output chan<- int) { 
    var group sync.WaitGroup 
    for i := range inputs { 
    group.Add(1) 
    go func(input <-chan int) { 
     for val := range input { 
     output <- val 
     } 
     group.Done() 
    } (inputs[i]) 
    } 
    go func() { 
    group.Wait() 
    close(output) 
    }() 
} 
+1

啊,非常好的解決方案,清晰簡潔。謝謝! – elpres

+0

現在有一個包含函數的包(https://godoc.org/github.com/eapache/channels#Multiplex),它使用反射而不是多個goroutines來解決問題。 – Evan

0

使用goroutines我製作了這個。這是你想要的嗎?

package main 

import (
    "fmt" 
) 

func multiplex(cin []chan int, cout chan int) { 
    n := len(cin) 
    for _, ch := range cin { 
     go func(src chan int) { 
      for { 
       v, ok := <-src 
       if ok { 
        cout <- v 
       } else { 
        n-- // a little dangerous. Maybe use a channel to avoid missed decrements 
        if n == 0 { 
         close(cout) 
        } 
        break 
       } 
      } 
     }(ch) 
    } 
} 

// a main to test the multiplex 
func main() { 
    cin := make([]chan int, 3) 
    cin[0] = make(chan int, 2) 
    cin[1] = make(chan int, 2) 
    cin[2] = make(chan int, 2) 
    cout := make(chan int, 2) 
    multiplex(cin, cout) 
    cin[1] <- 1 
    cin[0] <- 2 
    cin[2] <- 3 
    cin[1] <- 4 
    cin[0] <- 5 
    close(cin[1]) 
    close(cin[0]) 
    close(cin[2]) 
    for { 
     v, ok := <-cout 
     if ok { 
      fmt.Println(v) 
     } else { 
      break 
     } 
    } 
} 

編輯:參考文獻:

http://golang.org/ref/spec#Receive_operator

http://golang.org/ref/spec#Close

+0

該文檔說,如果您從頻道讀取值爲「,好」,該操作不會阻止。然後'ok'的值就是'false',執行繼續。如果這是正確的(我對Go很陌生並且不能完全說明),那麼如果通道是空的但還沒有關閉,那麼'if ok'行將評估爲'false'並執行'else'分支。 但是,如果你用select語句替換「v,ok:= < - src」和「if」,那麼它可能工作。必須測試一下。感謝您的回覆,順便說一句。 – elpres

+1

你從哪裏讀到該操作沒有阻止?我沒有找到它,它似乎不符合我觀察到的。我從文檔中看到,一旦頻道關閉,它不會阻止*。 –

+1

這似乎來自舊版本的規範,例如[這裏](http://go.googlecode.com/hg/doc/go_spec.html?r=c64e293#Communication_operators),看看「方法表達式」之前的最後一段。在當前版本中,這段話有所改變,並且說「由於頻道是_closed和empty_(false)」而返回零值。這聽起來像'false'只在渠道排乾和關閉後才返回,對嗎?那意味着我錯了。 – elpres

2

編輯:加入成對還原示例代碼和答案的重新排序部件。

首選的解決方案是「重組,以便您沒有渠道片」的非答案。重組通常可以使用多個goroutine可以發送到單個通道的功能。因此,不要讓每個來源都在不同的頻道上發送,然後不得不處理來自一堆頻道的接收,只需創建一個頻道,並讓所有來源在該頻道上發送即可。

Go沒有提供從一部分頻道接收的功能。這是一個經常被問到的問題,雖然剛剛給出的解決方案是首選,但也有編程的方法。我認爲你在原始問題中提出的「減少片段配對」的解決方案是一種二元分治法。這工作得很好,只要你有一個將兩個通道複用爲一個的解決方案。你的示例代碼非常接近工作。

你只是缺少一個讓你的示例代碼工作的小技巧。在你遞減n的地方,增加一條線將通道變量設置爲零。例如,我將代碼讀取爲

case v, ok := <-cin1: 
     if ok { 
      cout <- v 
     } else { 
      n-- 
      cin1 = nil 
     } 
    case v, ok := <-cin2: 
     if ok { 
      cout <- v 
     } else { 
      n-- 
      cin2 = nil 
     } 
    } 

此解決方案可以滿足您的要求並且不會忙於等待。

那麼,將這一解決方案成復片功能的完整的例子:

package main 

import (
    "fmt" 
    "time" 
) 

func multiplex(cin []chan int, cout chan int) { 
    var cin0, cin1 chan int 
    switch len(cin) { 
    case 2: 
     cin1 = cin[1] 
     fallthrough 
    case 1: 
     cin0 = cin[0] 
    case 0: 
    default: 
     cin0 = make(chan int) 
     cin1 = make(chan int) 
     half := len(cin)/2 
     go multiplex(cin[:half], cin0) 
     go multiplex(cin[half:], cin1) 
    } 
    for cin0 != nil || cin1 != nil { 
     select { 
     case v, ok := <-cin0: 
      if ok { 
       cout <- v 
      } else { 
       cin0 = nil 
      } 
     case v, ok := <-cin1: 
      if ok { 
       cout <- v 
      } else { 
       cin1 = nil 
      } 
     } 
    } 
    close(cout) 
} 

func main() { 
    cin := []chan int{ 
     make(chan int), 
     make(chan int), 
     make(chan int), 
    } 
    cout := make(chan int) 
    for i, c := range cin { 
     go func(x int, cx chan int) { 
      for i := 1; i <= 3; i++ { 
       time.Sleep(100 * time.Millisecond) 
       cx <- x*10 + i 
      } 
      close(cx) 
     }(i, c) 
    } 
    go multiplex(cin, cout) 
    for { 
     select { 
     case v, ok := <-cout: 
      if ok { 
       fmt.Println("main gets", v) 
      } else { 
       return 
      } 
     } 
    } 
} 
+1

不,不是。我正在尋找與func multiplex(cin [] chan int,cout chan int)一樣的函數,即可以在任意數量的輸入通道上運行而不是硬編碼爲2的輸入通道。 – elpres

相關問題