我有一個腳本,它從數據庫中選擇一些數據併發送到一個通道,由多個goroutines處理,然後將結果發送回主線程以在數據庫上更新。去渠道:如何使這個非阻塞?
但是,它將數據發送到第一個通道時掛起(可能阻塞)。
var chin = make(chan in)
var chout = make(chan out)
in
和out
均爲structs
首先夠程被啓動:
for i:=0; i<5; i++ {
go worker()
}
加載信道的代碼然後
的信道與全局創建的:
if verbose {
fmt.Println(`Getting nextbatch2 and sending to workers`)
}
rows, err = nextbatch2.Query()
if err != nil {
panic(err)
}
var numtodo int
for rows.Next() {
err = rows.Scan(&id, &data)
if err != nil {
rows.Close()
panic(err)
}
// Start
var vin in
vin.id = id
vin.data = data
chin <- vin
numtodo++
}
rows.Close()
然後後立即這樣:
if verbose {
fmt.Println(`Processing out channel from workers`)
}
for res := range chout {
update5.Exec(res.data, res.id)
if numtodo--; numtodo == 0 {
break
}
}
,並在後臺多個worker()
夠程運行:打印Getting nextbatch2 and sending to workers
後
func worker() {
for res := range chin {
var v out
v.id = res.id
v.data = process(res.data)
chout <- v
}
}
此代碼掛起。它永遠不會到Processing out channel from workers
。所以它掛在rows.Next()
循環中的某個地方,對此我找不到原因,因爲chin
通道應該是非阻塞的 - 即使worker()
goroutines沒有處理它,它仍應該至少完成該循環。
任何想法?
編輯:
通過增加對fmt.Println(" on", numtodo)
在rows.Next()
循環結束時,我可以看到,它的塊5之後,我不明白,因爲它應該是非阻塞的,對不對?
編輯2:
通過改變信道到make(chan in/out, 100)
現在將阻塞後105
我不明白你的意思,它在此之前阻止。問題不在於接收器阻塞,接收器是爲了阻止。問題是'chin'頻道阻塞。而且,它們並不需要關閉,因爲整個事情都是無限循環的,儘管你在這裏看不到這些代碼。 – Alasdair
您的'worker()'不能發送到'chout',因爲沒有goroutine讀取'chout'。 –
它不僅僅是在隊列中收集嗎? – Alasdair