2017-08-25 26 views
0

我從google io 2010中獲取loadbalancer代碼,併爲優先級隊列和Balancer的同步鎖定添加了實現。我故意設置workFn函數的延遲大於requester,所以我可以看到待定值將如何增加。我在cli中運行它,注意到在所有工人啓動後,程序停止並等待所有工人的值爲並且什麼也沒有顯示。我無法弄清楚錯誤在哪裏,有時候completed只會調用一次或兩次。看起來<-b.done在選擇的情況下沒有正確處理。簡單負載均衡器無法正常工作

package main 

import (
    "container/heap" 
    "fmt" 
    "math/rand" 
    "os" 
    "sync" 
    "time" 
) 

var nWorker int32 = 6 

func main() { 
    rchanel := make(chan Request) 
    workers := Pool{ 
     {make(chan Request), 0, 0}, 
     {make(chan Request), 0, 1}, 
     {make(chan Request), 0, 2}, 
     {make(chan Request), 0, 3}, 
     {make(chan Request), 0, 4}, 
     {make(chan Request), 0, 5}, 
    } 
    doneChan := make(chan *Worker) 
    balancer := Balancer{workers, sync.Mutex{}, doneChan} 
    for _, elem := range workers { 
     go elem.work(doneChan) 
    } 
    go balancer.balance(rchanel) 
    go requester(rchanel) 

    var input string 
    fmt.Scanln(&input) 
} 

type Request struct { 
    fn func() int 
    c chan int 
} 

func requester(work chan Request) { 
    c := make(chan int) 
    for { 
     time.Sleep(time.Duration(rand.Int31n(nWorker)) * 2e4) 
     work <- Request{workFn, c} 
     go func() { 
      result := <-c 
      fmt.Fprintf(os.Stderr, "Done: %v \n", result) 
     }() 
    } 
} 

func workFn() int { 
    val := rand.Int31n(nWorker) 
    time.Sleep(time.Duration(val) * 2e8) 
    return int(val) 
} 

type Worker struct { 
    requests chan Request 
    pending int 
    index int 
} 

func (w *Worker) work(done chan *Worker) { 
    for { 
     req := <-w.requests 
     req.c <- req.fn() 
     done <- w 
    } 
} 

type Pool []*Worker 

func (p Pool) Less(i, j int) bool { 
    return p[i].pending < p[j].pending 
} 
func (p Pool) Swap(i, j int) { 
    p[i], p[j] = p[j], p[i] 
    p[i].index = i 
    p[j].index = j 
} 
func (p Pool) Len() int { return len(p) } 
func (p *Pool) Push(x interface{}) { 
    n := len(*p) 
    worker := x.(*Worker) 
    worker.index = n 
    *p = append(*p, worker) 
} 
func (p *Pool) Pop() interface{} { 
    old := *p 
    n := len(old) 
    item := old[n-1] 
    item.index = -1 
    *p = old[0 : n-1] 
    return item 
} 

type Balancer struct { 
    pool Pool 
    mu sync.Mutex 
    done chan *Worker 
} 

func (b *Balancer) dispatch(req Request) { 
    b.mu.Lock() 
    w := heap.Pop(&b.pool).(*Worker) 
    w.requests <- req 
    w.pending++ 
    heap.Push(&b.pool, w) 
    b.mu.Unlock() 
} 
func (b *Balancer) completed(w *Worker) { 
    b.mu.Lock() 
    w.pending-- 
    heap.Remove(&b.pool, w.index) 
    heap.Push(&b.pool, w) 
    b.mu.Unlock() 
} 

func (b *Balancer) balance(work chan Request) { 
    for { 
     select { 
     case req := <-work: 
      b.dispatch(req) 
      b.printStatus() 
     case w := <-b.done: 
      b.completed(w) 
      b.printStatus() 
     } 
    } 
} 

func (b *Balancer) printStatus() { 
    fmt.Fprintf(os.Stderr, "Status: %v %v %v %v %v %v\n", b.pool[0].pending, b.pool[1].pending, b.pool[2].pending, b.pool[3].pending, b.pool[4].pending, b.pool[5].pending) 
} 
+0

需要注意的是,這段代碼會在'Worker.work'上泄漏goroutines。首先觀察。仍在審查尋找問題的代碼。 – RayfenWindspear

回答

1

的問題是,balance()夠程最終被擋在dispatch()w.requests <- req在同一時間,具體Worker被阻斷work()done <- w,生產運行balance()的夠程死鎖。

這裏是你需要的修復。 balance()需要在內部使用goroutines。這將解決這個問題,因爲現在如果dispatch()completed()中的例程阻止無關緊要,balance()的主程序將從channel繼續selects。

注:這不適用於操場,因爲它永遠持續下去。

func (b *Balancer) balance(work chan Request) { 
    for { 
     select { 
     case req := <-work: 
      go func() { 
       b.dispatch(req) 
       b.printStatus() 
      }() 
     case w := <-b.done: 
      go func() { 
       b.completed(w) 
       b.printStatus() 
      }() 
     } 
    } 
} 

現在printStatus通話可以同時進行,它需要利用mutex的一樣好,甚至你會得到隨機panic秒。

func (b *Balancer) printStatus() { 
    b.mu.Lock() 
    fmt.Fprintf(os.Stderr, "Status: %v %v %v %v %v %v\n", b.pool[0].pending, b.pool[1].pending, b.pool[2].pending, b.pool[3].pending, b.pool[4].pending, b.pool[5].pending) 
    b.mu.Unlock() 
} 

現在,如果我可以弄清楚爲什麼pending值只是不斷增加......據我所知,應該Worker.work()永遠只允許pending01因爲Worker必須等待對done <- w然後才能從dispatch()獲得另一個Request。我相信這是理想的結果,但不是嗎?

+0

嗨@RayfenWindspear,感謝您的解釋,是有意增加了待處理的請求。但我仍然不明白爲什麼我們在'dispatch'裏面被阻塞,它是完全不同的通道,'done <-w'和'w.requests <-req',並且'select'在調度時不會再執行一次行動,我說得對嗎? –

+1

@MaksymVolodin正確的,只有一個例程在運行,它會阻止等待特定的工作人員檢索'Request',而工人被阻止嘗試'完成< - w'。實際上我不確定這是一個很好的例子,因爲很難找到涉及到這麼多渠道的事情,並且以不直觀的方式進行交互。我沒有必要去看一個頻道,只需要一遍又一遍地檢查它的定義,看看它究竟做了多少次。確實是一段艱難的代碼。這並不意味着它不正確。 – RayfenWindspear