2015-05-01 27 views
2

我有一個腳本,它從數據庫中選擇一些數據併發送到一個通道,由多個goroutines處理,然後將結果發送回主線程以在數據庫上更新。去渠道:如何使這個非阻塞?

但是,它將數據發送到第一個通道時掛起(可能阻塞)。

var chin = make(chan in) 
var chout = make(chan out) 

inout均爲structs

首先夠程被啓動:

for i:=0; i<5; i++ { 
    go worker() 
} 

加載信道的代碼然後

的信道與全局創建的:

 if verbose { 
      fmt.Println(`Getting nextbatch2 and sending to workers`) 
     } 

     rows, err = nextbatch2.Query() 
     if err != nil { 
      panic(err) 
     } 

     var numtodo int 
     for rows.Next() { 
      err = rows.Scan(&id, &data) 
      if err != nil { 
       rows.Close() 
       panic(err) 
      } 

      // Start 
      var vin in 
      vin.id = id 
      vin.data = data 
      chin <- vin 
      numtodo++ 
     } 
     rows.Close() 

然後後立即這樣:

 if verbose { 
      fmt.Println(`Processing out channel from workers`) 
     } 

     for res := range chout { 
      update5.Exec(res.data, res.id) 
      if numtodo--; numtodo == 0 { 
       break 
      } 
     } 

,並在後臺多個worker()夠程運行:打印Getting nextbatch2 and sending to workers

func worker() { 
     for res := range chin { 
      var v out 
      v.id = res.id 
      v.data = process(res.data) 
      chout <- v 
     } 
} 

此代碼掛起。它永遠不會到Processing out channel from workers。所以它掛在rows.Next()循環中的某個地方,對此我找不到原因,因爲chin通道應該是非阻塞的 - 即使worker() goroutines沒有處理它,它仍應該至少完成該循環。

任何想法?

編輯:

通過增加對fmt.Println(" on", numtodo)rows.Next()循環結束時,我可以看到,它的塊5之後,我不明白,因爲它應該是非阻塞的,對不對?

編輯2:

通過改變信道到make(chan in/out, 100)現在將阻塞後105

回答

3

接收機總是阻塞,直到有數據要接收。如果通道未經緩衝,則發送器阻塞,直到接收器收到該值。

https://golang.org/doc/effective_go.html#channels

所以你可以重寫你的消費者的代碼是這樣的:

go func(){ 
    for res := range chout { 
     update5.Exec(res.data, res.id) 
    } 
}() 

還需要close(chin)close(chout)range語句的正確用法。

+0

我不明白你的意思,它在此之前阻止。問題不在於接收器阻塞,接收器是爲了阻止。問題是'chin'頻道阻塞。而且,它們並不需要關閉,因爲整個事情都是無限循環的,儘管你在這裏看不到這些代碼。 – Alasdair

+1

您的'worker()'不能發送到'chout',因爲沒有goroutine讀取'chout'。 –

+0

它不僅僅是在隊列中收集嗎? – Alasdair