2016-07-15 122 views
0

我想將從頻道接收的數據廣播到頻道列表。頻道列表是動態的,可以在運行階段修改。在Go中通過多個頻道廣播頻道

作爲Go中的新開發者,我寫了這段代碼。我發現我想要的東西很重。有一個更好的方法嗎?

package utils 

import "sync" 

// StringChannelBroadcaster broadcasts string data from a channel to multiple channels 
type StringChannelBroadcaster struct { 
    Source  chan string 
    Subscribers map[string]*StringChannelSubscriber 
    stopChannel chan bool 
    mutex  sync.Mutex 
    capacity uint64 
} 

// NewStringChannelBroadcaster creates a StringChannelBroadcaster 
func NewStringChannelBroadcaster(capacity uint64) (b *StringChannelBroadcaster) { 
    return &StringChannelBroadcaster{ 
     Source:  make(chan string, capacity), 
     Subscribers: make(map[string]*StringChannelSubscriber), 
     capacity: capacity, 
    } 
} 

// Dispatch starts dispatching message 
func (b *StringChannelBroadcaster) Dispatch() { 
    b.stopChannel = make(chan bool) 
    for { 
     select { 
     case val, ok := <-b.Source: 
      if ok { 
       b.mutex.Lock() 
       for _, value := range b.Subscribers { 
        value.Channel <- val 
       } 
       b.mutex.Unlock() 
      } 
     case <-b.stopChannel: 
      return 
     } 
    } 
} 

// Stop stops the Broadcaster 
func (b *StringChannelBroadcaster) Stop() { 
    close(b.stopChannel) 
} 

// StringChannelSubscriber defines a subscriber to a StringChannelBroadcaster 
type StringChannelSubscriber struct { 
    Key  string 
    Channel chan string 
} 

// NewSubscriber returns a new subsriber to the StringChannelBroadcaster 
func (b *StringChannelBroadcaster) NewSubscriber() *StringChannelSubscriber { 
    key := RandString(20) 
    newSubscriber := StringChannelSubscriber{ 
     Key:  key, 
     Channel: make(chan string, b.capacity), 
    } 
    b.mutex.Lock() 
    b.Subscribers[key] = &newSubscriber 
    b.mutex.Unlock() 

    return &newSubscriber 
} 

// RemoveSubscriber removes a subscrber from the StringChannelBroadcaster 
func (b *StringChannelBroadcaster) RemoveSubscriber(subscriber *StringChannelSubscriber) { 
    b.mutex.Lock() 
    delete(b.Subscribers, subscriber.Key) 
    b.mutex.Unlock() 
} 

謝謝

朱利安

+0

有時候,代碼感覺「沉重」,因爲低級別操作周圍沒有語法糖包裝。這對我來說似乎是一種正常的做法;你想看到什麼「更輕」? –

回答

1

我想你可以把它簡化一下:擺脫stopChannelStop方法。您可以關閉Source而不是調用Stop,並檢測Dispatch(ok將爲false)以退出(您可以實際在源通道範圍內)。

您可以擺脫Dispatch,並且只需在for循環中啓動NewStringChannelBroadcaster的goroutine,因此外部代碼無需單獨啓動調度循環。

您可以使用通道類型作爲映射鍵,因此您的映射可以變爲​​(空結構,因爲您不需要映射值)。因此,您的NewSubscriber可以採用通道類型參數(或創建一個新通道並返回),並將其插入地圖中,而不需要隨機字符串或StringChannelSubscriber類型。

我也做了一些改進,如關閉用戶渠道:

package main 

import "sync" 

import (
    "fmt" 
    "time" 
) 

// StringChannelBroadcaster broadcasts string data from a channel to multiple channels 
type StringChannelBroadcaster struct { 
    Source  chan string 
    Subscribers map[chan string]struct{} 
    mutex  sync.Mutex 
    capacity uint64 
} 

// NewStringChannelBroadcaster creates a StringChannelBroadcaster 
func NewStringChannelBroadcaster(capacity uint64) *StringChannelBroadcaster { 
    b := &StringChannelBroadcaster{ 
     Source:  make(chan string, capacity), 
     Subscribers: make(map[chan string]struct{}), 
     capacity: capacity, 
    } 
    go b.dispatch() 
    return b 
} 

// Dispatch starts dispatching message 
func (b *StringChannelBroadcaster) dispatch() { 
    // for iterates until the channel is closed 
    for val := range b.Source { 
     b.mutex.Lock() 
     for ch := range b.Subscribers { 
      ch <- val 
     } 
     b.mutex.Unlock() 
    } 
    b.mutex.Lock() 
    for ch := range b.Subscribers { 
     close(ch) 
     // you shouldn't be calling RemoveSubscriber after closing b.Source 
     // but it's better to be safe than sorry 
     delete(b.Subscribers, ch) 
    } 
    b.Subscribers = nil 
    b.mutex.Unlock() 
} 

func (b *StringChannelBroadcaster) NewSubscriber() chan string { 
    ch := make(chan string, b.capacity) 
    b.mutex.Lock() 
    if b.Subscribers == nil { 
     panic(fmt.Errorf("NewSubscriber called on closed broadcaster")) 
    } 
    b.Subscribers[ch] = struct{}{} 
    b.mutex.Unlock() 

    return ch 
} 

// RemoveSubscriber removes a subscrber from the StringChannelBroadcaster 
func (b *StringChannelBroadcaster) RemoveSubscriber(ch chan string) { 
    b.mutex.Lock() 
    if _, ok := b.Subscribers[ch]; ok { 
     close(ch)     // this line does have to be inside the if to prevent close of closed channel, in case RemoveSubscriber is called twice on the same channel 
     delete(b.Subscribers, ch) // this line doesn't need to be inside the if 
    } 
    b.mutex.Unlock() 
} 

func main() { 
    b := NewStringChannelBroadcaster(0) 

    var toberemoved chan string 

    for i := 0; i < 3; i++ { 
     i := i 

     ch := b.NewSubscriber() 
     if i == 1 { 
      toberemoved = ch 
     } 
     go func() { 
      for v := range ch { 
       fmt.Printf("receive %v: %v\n", i, v) 
      } 
      fmt.Printf("Exit %v\n", i) 
     }() 
    } 

    b.Source <- "Test 1" 
    b.Source <- "Test 2" 
    // This is a race condition: the second reader may or may not receive the first two messages. 
    b.RemoveSubscriber(toberemoved) 
    b.Source <- "Test 3" 

    // let the reader goroutines receive the last message 
    time.Sleep(2 * time.Second) 

    close(b.Source) 

    // let the reader goroutines write close message 
    time.Sleep(1 * time.Second) 
} 

https://play.golang.org/p/X-NcikvbDM

編輯:我加你的編輯關閉Source後調用RemoveSubscriber當修復恐慌,但你不應該不要這樣做,你應該在通道關閉後讓結構和其中的所有內容都被垃圾回收。 如果在關閉Source之後調用NewSubscriber,我也增加了一個恐慌。以前你可以做到這一點,它會泄漏創建的頻道,大概是在那個頻道上永遠屏蔽的goroutine。

如果您可以在已關閉的廣播公司上致電NewSubscriber(或RemoveSubscriber),這可能意味着您的代碼在某處存在錯誤,因爲您堅持不應該是廣播公司。