2016-02-11 71 views
0

我想收聽多個傳輸編碼響應的HTTP流,然後逐行讀取消息,然後將消息推送到一個通道。然後我想從頻道中讀取並稍後通過websocket。HTTP分塊流式傳輸到WebSocket

func subscribe(ws chan<- string, group string) (scanner *bufio.Scanner, err error){ 
    res, _ := req(STREAM_URL, channelTemplate(group)) 
    reader := bufio.NewScanner(res.Body) 
    return reader, reader.Err() 
} 

func main() { 
    ws := make(chan string) 
    request, _ := http.NewRequest("GET", URL, nil) 
    request.Header.Add("Content-Type", "application/json") 
    client := &http.Client{} 
    resp, _ := client.Do(request) 
    ac := ACResponse{} 
    json.NewDecoder(resp.Body).Decode(&ac) 
    resp.Body.Close() 
    var scanners = make([]*bufio.Scanner, 0) 
    for _, group := range ac.Groups { 
     fmt.Println("Started worker for", group) 
     //listen to all stream URLs 
     scanner, err := subscribe(ws, group) 
     if err != nil { 
      panic(err) 
     } 
     // keep track of Scanner to read later 
     scanners = append(scanners, scanner) 
    } 
    for { 
     select { 
     case msg := <-ws: 
      fmt.Println("[events] ", msg) 
     default: 
      randScanner := rand.Intn(len(ac.Groups)-1) 
      fmt.Println("Reading from", randScanner) 
      reader := scanners[randScanner] 
      reader.Scan() 
      if err := reader.Err(); err != nil { 
       panic(err) 
      } 
      text := reader.Text() 
      ws <- text 
     } 
    } 
} 

該程序阻止在reader.Scan()。輸出是Reading from 1,沒有別的。我看着wireshark,消息正在通過。

我怎樣才能更好地使用Go來設計這個問題?

+0

標題中提到的websocket代碼在哪裏?爲了調試這個,通過發送一個SIQUIT進程來打印goroutine棧。這可能會讓你對程序停滯的地方有所瞭解。 –

+0

交叉發佈在這裏:https://groups.google.com/d/topic/golang-nuts/dQu1AU38F8Y/discussion – JimB

+0

尚未編寫websocket代碼,我只想確認它與stdout的工作。 – viperfx

回答

0

發送到無緩衝通道的主塊ws。要解決此問題,更改ws到緩衝信道:

ws := make(chan string, 1) 

的第二個問題是,main()中後繼續它們達到EOF閱讀掃描器。問題出現在這些行上:

 reader.Scan() 
     if err := reader.Err(); err != nil { 
      panic(err) 
     } 
     text := reader.Text() 

Scan()在EOF處返回false,但忽略來自掃描的返回值。 Err()在EOF上返回nil。修改應用程序以使用Scan()中的返回值。

還有一個問題是讀取任何一個掃描儀的主要塊。以避免阻塞在單個連接上,啓動一個的goroutine讀取每個連接:

func subscribe(wg *sync.WaitGroup, ws chan<- string, group string) { 
    defer wg.Done() 
    res, err := req(STREAM_URL, channelTemplate(group)) 
    if err ! nil { 
     // handle error 
    } 
    defer resp.Body.Close() 
    reader := bufio.NewScanner(res.Body) 
    for reader.Scan() { 
     ws <- reader.Text() 
    } 
    if err := reader.Err(); err != nil { 
     // handle error 
    } 
} 

func main() { 
    ws := make(chan string) 
    request, _ := http.NewRequest("GET", URL, nil) 
    request.Header.Add("Content-Type", "application/json") 
    resp, err := http.DefaultClient.Do(request) 
    if err != nil { 
     // handle error 
    } 
    var ac ACResponse 
    if err := json.NewDecoder(resp.Body).Decode(&ac); err != nil { 
     // handle error 
    } 
    resp.Body.Close() 
    var wg sync.WaitGroup 
    for _, group := range ac.Groups { 
     wg.Add(1) 
     go subscribe(&wg, ws, group) 
    } 

    go func() { 
     wg.Wait() 
     close(ws) 
    }() 

    for msg := range ws { 
     fmt.Println("[events] ", msg) 
    } 
} 

上面的代碼是未編譯的和未經測試。我標記了需要錯誤處理的地方。在所有連接到達EOF後,我編寫了代碼以退出main。這可能是也可能不是你想要的應用程序。