2015-02-11 121 views
0

我有一個golang服務器做這樣的事情: 包主要異步消息golang

func main() { 
    for { 
     c := listener.Accept() 
     go handle(c) 
    } 
} 

... 
func handle(c net.Conn) { 
    m := readMessage(c) // func(net.Conn)Message 
    r := processMessage(m) //func(Message)Result 
    sendResult(c, r)  // func(net.Conn,Result) 
} 

讀取和寫入同步消息。我現在需要的是通過一個給定的開放連接來異步發送消息,我知道一個通​​道可以被我使用。

這是我的想法:

... 
func someWhereElese(c chan Result) { 
    // generate a message and a result 
    r := createResultFromSomewhere() 
    c <- r // send the result through the channel 
} 

並修改我的把手使用同一通道,而不是

func handle(c net.Conn, rc chan Result) { 
    m := readMessage(c) // func(net.Conn)Message 
    r := processMessage(m) //func(Message)Result 
    //sendResult(c, r)  // func(net.Conn,Result) 
    rc <- r 
} 

這裏就是我的困惑所在。

結果通道應創建,它應該有一個連接發送到哪裏不管它接收

func doSend(c net.Con, rc chan Result) { 
    r := rc   // got a result channel 
    sendResult(c, r) // send it through the wire 
} 

但應該在哪裏該通道產生的呢?在主循環中?

func main() { 
    ... 
    for { 
     c := l.Accept() 
     rc := make(chan Result) 
     go doSend(c, rc) 
    } 
} 

怎麼樣讀?它應該進入它自己的頻道/ gorutine嗎? 如果我需要向n個客戶進行廣播,我是否應該保留一部分結果頻道?一片連接?

我在這裏很困惑,但我覺得我很接近。

+0

從這裏開始:http://talks.golang.org/2012/concurrency.slide。你的問題的關鍵是使用'select'來觀看一個傳入的連接頻道,一個響應頻道和一個退出頻道。根據您的預期負載,每個請求創建一個goroutine可能沒有問題;或者您可能需要創建一個池。但是一旦你通過Go Concurrency Patterns,你會更好地理解正確的問題。另見http://blog.golang.org/advanced-go-concurrency-patterns,特別是最後的「相關文章」。理解'select'是偉大的「哦,這就是Go的工作方式!」時刻。 – 2015-02-11 20:54:19

+0

@RobNapier呃...我之前通過幻燈片閱讀,我有點不明白。我會在一會兒再次通過他們。同時,我設法制作了一個小程序,如果您發現任何特別危險的事情,您能否對此發表評論? – OscarRyz 2015-02-11 22:04:50

+0

好的;我可能誤解了你的意思是「異步」。我曾假設多個請求和響應會發生在同一個連接上(交錯請求)。您似乎意味着您想在閱讀器獲取數據時傳輸數據。這可能比讀取兩個字節並在一個goroutine中寫入兩個字節更平行一些。你可能想看看'golang.org/x/text/transform'和'io.Copy()'。 (對不起,我對這裏的實際代碼沒有更多幫助;這是一個有趣的問題,但我沒有時間這一分鐘來幫助廣泛。) – 2015-02-11 22:55:26

回答

0

這個程序似乎解決我迫切的問題

package main 

import (
    "bytes" 
    "encoding/binary" 
    "log" 

    "net" 
) 

var rcs []chan int = make([]chan int,0) 


func main() { 
    a, e := net.ResolveTCPAddr("tcp", ":8082") 
    if e != nil { 
     log.Fatal(e) 
    } 
    l, e := net.ListenTCP("tcp", a) 
    for { 
     c, e := l.Accept() 
     if e != nil { 
      log.Fatal(e) 
     } 
     rc := make(chan int) 
     go read(c, rc) 
     go write(c, rc) 
     rcs = append(rcs, rc) 
     // simulate broacast 
     log.Println(len(rcs)) 
     if len(rcs) > 5 { 
      func() { 
       for _, v := range rcs { 
        log.Println("sending") 
        select { 
        case v <- 34: 
         log.Println("done sending") 
        default: 
         log.Println("didn't send") 
        } 
       } 
      }() 
     } 
    } 
} 
func read(c net.Conn, rc chan int) { 
    h := make([]byte, 2) 
    for { 
     _, err := c.Read(h) 
     if err != nil { 
      rc <- -1 
     } 
     var v int16 
     binary.Read(bytes.NewReader(h[:2]), binary.BigEndian, &v) 
     rc <- int(v) 
    } 
} 
func write(c net.Conn, rc chan int) { 
    for { 
     r := <-rc 
     o := []byte{byte(r * 2)} 
     c.Write(o) 
    } 
}