所以我看到很多方法實現一個消費者和許多生產者在圍棋 - 經典fanIn函數從Concurrency in Go談話。轉:一個生產者很多消費者
我想要的是一個fanOut函數。它將一個通道從參數中讀取一個值作爲參數,並返回一個寫入該值副本的通道片段。
是否有一個正確的/推薦的方式來實現這個?
所以我看到很多方法實現一個消費者和許多生產者在圍棋 - 經典fanIn函數從Concurrency in Go談話。轉:一個生產者很多消費者
我想要的是一個fanOut函數。它將一個通道從參數中讀取一個值作爲參數,並返回一個寫入該值副本的通道片段。
是否有一個正確的/推薦的方式來實現這個?
你幾乎描述了最好的方式來做到這一點,但這是一小部分代碼。
轉到操場:https://play.golang.org/p/jwdtDXVHJk
package main
import (
"fmt"
"time"
)
func producer(iters int) <-chan int {
c := make(chan int)
go func() {
for i := 0; i < iters; i++ {
c <- i
time.Sleep(1 * time.Second)
}
close(c)
}()
return c
}
func consumer(cin <-chan int) {
for i := range cin {
fmt.Println(i)
}
}
func fanOut(ch <-chan int, size, lag int) []chan int {
cs := make([]chan int, size)
for i, _ := range cs {
// The size of the channels buffer controls how far behind the recievers
// of the fanOut channels can lag the other channels.
cs[i] = make(chan int, lag)
}
go func() {
for i := range ch {
for _, c := range cs {
c <- i
}
}
for _, c := range cs {
// close all our fanOut channels when the input channel is exhausted.
close(c)
}
}()
return cs
}
func fanOutUnbuffered(ch <-chan int, size int) []chan int {
cs := make([]chan int, size)
for i, _ := range cs {
// The size of the channels buffer controls how far behind the recievers
// of the fanOut channels can lag the other channels.
cs[i] = make(chan int)
}
go func() {
for i := range ch {
for _, c := range cs {
c <- i
}
}
for _, c := range cs {
// close all our fanOut channels when the input channel is exhausted.
close(c)
}
}()
return cs
}
func main() {
c := producer(10)
chans := fanOutUnbuffered(c, 3)
go consumer(chans[0])
go consumer(chans[1])
consumer(chans[2])
}
的重要組成部分,要注意的是我們如何關閉輸出通道一旦輸入通道已經用完。另外如果其中一個輸出通道阻塞發送,它將阻止其他輸出通道上的發送。我們通過設置通道的緩衝區大小來控制滯後量。
下面這個解決方案是有點做作,但它爲我工作:
package main
import (
"fmt"
"time"
"crypto/rand"
"encoding/binary"
)
func handleNewChannels(arrchangen chan [](chan uint32),
intchangen chan (chan uint32)) {
currarr := []chan uint32{}
arrchangen <- currarr
for {
newchan := <-intchangen
currarr = append(currarr, newchan)
arrchangen <- currarr
}
}
func sendToChannels(arrchangen chan [](chan uint32)) {
tick := time.Tick(1 * time.Second)
currarr := <-arrchangen
for {
select {
case <-tick:
sent := false
var n uint32
binary.Read(rand.Reader, binary.LittleEndian, &n)
for i := 0 ; i < len(currarr) ; i++ {
currarr[i] <- n
sent = true
}
if sent {
fmt.Println("Sent generated ", n)
}
case newarr := <-arrchangen:
currarr = newarr
}
}
}
func handleChannel(tchan chan uint32) {
for {
val := <-tchan
fmt.Println("Got the value ", val)
}
}
func createChannels(intchangen chan (chan uint32)) {
othertick := time.Tick(5 * time.Second)
for {
<-othertick
fmt.Println("Creating new channel! ")
newchan := make(chan uint32)
intchangen <- newchan
go handleChannel(newchan)
}
}
func main() {
arrchangen := make(chan [](chan uint32))
intchangen := make(chan (chan uint32))
go handleNewChannels(arrchangen, intchangen)
go sendToChannels(arrchangen)
createChannels(intchangen)
}
好極了!謝謝。這是關閉了渠道,這讓我很不舒服。作爲對未來需要此功能的人們的感謝和快速參考,以下是一個正在運行的版本:http://play.golang.org/p/jwdtDXVHJk – Carl