2015-11-27 96 views
0

我在探索Go並試圖建立一種使用渠道的管道。我只想在main()中讀取一些內容並將它們發送到process()進行處理,在這種情況下,只需將值輸出到屏幕即可。使用渠道去管道

不幸的是,在下面的代碼中,似乎process()永遠不會從通道讀取數據,或者至少不會打印任何數據;我究竟做錯了什麼?

package main 

import ("fmt" ; "database/sql" ; _ "github.com/lib/pq" ; "time" ; "gopkg.in/redis.v3")//; "strconv") 

type Record struct { 
    userId, myDate int 
    prodUrl string 
} 


func main(){ 

    //connect to db 
    db, err := sql.Open(...) 
    defer db.Close() 

    //error check here... 

    //exec query 
    rows, err := db.Query("select userID,url,date from mytable limit 10") 
    defer rows.Close() 

    //error check here... 

    //create channel to buffer rows read 
    bufferChan := make(chan *Record,1000) 
    go process(bufferChan) 

    //iterate through results and send them to process() 
    row := new(Record) 
    for rows.Next(){ 
     err := rows.Scan(&row.userId, &row.prodUrl, &row.myDate)   
     bufferChan <- row 
     fmt.Printf("row sent %v",row.userId)      
    } 
} 

//prints Record values 
func process (buffer chan *Record) { 
    row := <- buffer 
    fmt.Printf("row received: %d %v %d ", row.userId,row.prodUrl,row.myDate) 
} 

回答

1

原因FUNC過程不打印任何東西是你FUNC主要退出for循環rows.Next完成後從而退出程序。你需要做幾件事情。

  1. 在for循環後添加呼叫以關閉以指示結束添加消息到 緩衝通道否則會導致死鎖。所以請致電 關閉(bufferChan)
  2. 使用範圍遍歷func進程中的通道。
  3. 通過一個額外的通道來處理知道它什麼時候結束,所以 主要可以等待進程結束。

看看下面的例子中的代碼片段:

package main 

import "fmt" 

func main() { 
    bufferChan := make(chan int, 1000) 
    done := make(chan bool) 
    go process(bufferChan, done) 
    for i := 0; i < 100; i++ { 
     bufferChan <- i 
    } 
    close(bufferChan) 

    select { 
    case <-done: 
     fmt.Println("Done") 
    } 

} 

func process(c chan int, done chan bool) { 
    for s := range c { 
     fmt.Println(s) 
    } 
    done <- true 

} 
+0

謝謝,這個工作,但現在看來,發送和接收順序執行,我是理解錯了?我的目標是讓process()與main()並行執行,並在bufferChan通道可用時使用數據,但似乎main()會執行整個for()循環,然後纔會執行process()開始打印消息。 – derelict

+1

是的,這是因爲只有一次去消耗通道bufferChan上的東西。看看這個例子https://gobyexample.com/worker-pools這就是你如何實現並行處理。 –

1

您的主函數退出以至於整個程序結束。它應該等待處理結束。而且,過程函數應該使用range關鍵字遍歷信道。

腳手架的工作解決方案看起來像這樣:

package main 

import "fmt" 

func process(input chan int, done chan struct{}) { 
    for i := range input { 
     fmt.Println(i) 
    } 
    done <- struct{}{} 
} 

func main() { 
    input := make(chan int) 
    done := make(chan struct{}) 

    go process(input, done) 

    for i := 1; i < 10; i++ { 
     input <- i 
    } 
    close(input) 

    <-done 
} 

Playground

1

我相信你正在尋找io.pipe()去API,它是作家和讀者/秒之間創建一個同步內存管。這裏沒有緩衝。它可用於連接期望io.Reader的代碼與期望io.Writer的代碼。

在你的情況下,io.PipeWriter是代碼「從數據庫讀取值」和「io.PipeReader」是代碼「將值寫入屏幕」。

這裏,沒有任何緩衝區的流數據的示例,即bytes.Buffer

// Set up the pipe to write data directly into the Reader. 
pr, pw := io.Pipe() 
// Write JSON-encoded data to the Writer end of the pipe. 
// Write in a separate concurrent goroutine, and remember 
// to Close the PipeWriter, to signal to the paired PipeReader 
// that we’re done writing. 
go func() { 
    err := json.NewEncoder(pw).Encode(&v) 
    pw.Close() 
}() 
// Send the HTTP request. Whatever is read from the Reader 
// will be sent in the request body. 
// As data is written to the Writer, it will be available 
// to read from the Reader. 
resp, err := http.Post(「example.com」, 「application/json」, pr) 

參考:

https://medium.com/stupid-gopher-tricks/streaming-data-in-go-without-buffering-3285ddd2a1e5