2013-06-05 18 views
11

所以我看到很多方法實現一個消費者和許多生產者在圍棋 - 經典fanIn函數從Concurrency in Go談話。轉:一個生產者很多消費者

我想要的是一個fanOut函數。它將一個通道從參數中讀取一個值作爲參數,並返回一個寫入該值副本的通道片段。

是否有一個正確的/推薦的方式來實現這個?

回答

13

你幾乎描述了最好的方式來做到這一點,但這是一小部分代碼。

轉到操場:https://play.golang.org/p/jwdtDXVHJk

package main 

import (
    "fmt" 
    "time" 
) 

func producer(iters int) <-chan int { 
    c := make(chan int) 
    go func() { 
     for i := 0; i < iters; i++ { 
      c <- i 
      time.Sleep(1 * time.Second) 
     } 
     close(c) 
    }() 
    return c 
} 

func consumer(cin <-chan int) { 
    for i := range cin { 
     fmt.Println(i) 
    } 
} 

func fanOut(ch <-chan int, size, lag int) []chan int { 
    cs := make([]chan int, size) 
    for i, _ := range cs { 
     // The size of the channels buffer controls how far behind the recievers 
     // of the fanOut channels can lag the other channels. 
     cs[i] = make(chan int, lag) 
    } 
    go func() { 
     for i := range ch { 
      for _, c := range cs { 
       c <- i 
      } 
     } 
     for _, c := range cs { 
      // close all our fanOut channels when the input channel is exhausted. 
      close(c) 
     } 
    }() 
    return cs 
} 

func fanOutUnbuffered(ch <-chan int, size int) []chan int { 
    cs := make([]chan int, size) 
    for i, _ := range cs { 
     // The size of the channels buffer controls how far behind the recievers 
     // of the fanOut channels can lag the other channels. 
     cs[i] = make(chan int) 
    } 
    go func() { 
     for i := range ch { 
      for _, c := range cs { 
       c <- i 
      } 
     } 
     for _, c := range cs { 
      // close all our fanOut channels when the input channel is exhausted. 
      close(c) 
     } 
    }() 
    return cs 
} 

func main() { 
    c := producer(10) 
    chans := fanOutUnbuffered(c, 3) 
    go consumer(chans[0]) 
    go consumer(chans[1]) 
    consumer(chans[2]) 
} 

的重要組成部分,要注意的是我們如何關閉輸出通道一旦輸入通道已經用完。另外如果其中一個輸出通道阻塞發送,它將阻止其他輸出通道上的發送。我們通過設置通道的緩衝區大小來控制滯後量。

+1

好極了!謝謝。這是關閉了渠道,這讓我很不舒服。作爲對未來需要此功能的人們的感謝和快速參考,以下是一個正在運行的版本:http://play.golang.org/p/jwdtDXVHJk – Carl

2

下面這個解決方案是有點做作,但它爲我工作:

package main 

import (
    "fmt" 
    "time" 
    "crypto/rand" 
    "encoding/binary" 
) 

func handleNewChannels(arrchangen chan [](chan uint32), 
         intchangen chan (chan uint32)) { 
    currarr := []chan uint32{} 
    arrchangen <- currarr 
    for { 
     newchan := <-intchangen 
     currarr = append(currarr, newchan) 
     arrchangen <- currarr 
    } 
} 

func sendToChannels(arrchangen chan [](chan uint32)) { 
    tick := time.Tick(1 * time.Second) 
    currarr := <-arrchangen 
    for { 
     select { 
     case <-tick: 
      sent := false 
      var n uint32 
      binary.Read(rand.Reader, binary.LittleEndian, &n) 
      for i := 0 ; i < len(currarr) ; i++ { 
       currarr[i] <- n 
       sent = true 
      } 
      if sent { 
       fmt.Println("Sent generated ", n) 
      } 
     case newarr := <-arrchangen: 
      currarr = newarr 
     } 
    } 
} 
func handleChannel(tchan chan uint32) { 
    for { 
     val := <-tchan 
     fmt.Println("Got the value ", val) 
    } 
} 

func createChannels(intchangen chan (chan uint32)) { 
    othertick := time.Tick(5 * time.Second) 
    for { 
     <-othertick 
     fmt.Println("Creating new channel! ") 
     newchan := make(chan uint32) 
     intchangen <- newchan 
     go handleChannel(newchan) 
    } 
} 

func main() { 
    arrchangen := make(chan [](chan uint32)) 
    intchangen := make(chan (chan uint32)) 
    go handleNewChannels(arrchangen, intchangen) 
    go sendToChannels(arrchangen) 
    createChannels(intchangen) 
} 
相關問題