2017-10-04 113 views
4

我有我正在刮的網址列表。我想要做的是將所有成功抓取的頁面數據存儲到一個通道中,當我完成時,將其轉儲到一個切片中。我不知道我會得到多少成功的提取,所以我不能指定一個固定的長度。我預計代碼將達到wg.Wait(),然後等到所有的wg.Done()方法被調用,但我從未達到close(queue)聲明。尋找一個類似的答案,我碰到這個來得如此回答爲什麼我的代碼在goroutine中運行wg.Wait()時工作正常?

https://stackoverflow.com/a/31573574/5721702

在這裏筆者有類似的功能:

ports := make(chan string) 
toScan := make(chan int) 
var wg sync.WaitGroup 

// make 100 workers for dialing 
for i := 0; i < 100; i++ { 
    wg.Add(1) 
    go func() { 
     defer wg.Done() 
     for p := range toScan { 
      ports <- worker(*host, p) 
     } 
    }() 
} 

// close our receiving ports channel once all workers are done 
go func() { 
    wg.Wait() 
    close(ports) 
}() 

只要我包裹着我的wg.Wait()的夠程內,close(queue)達到:

urls := getListOfURLS() 
activities := make([]Activity, 0, limit) 
queue := make(chan Activity) 
for i, activityURL := range urls { 
    wg.Add(1) 
    go func(i int, url string) { 
     defer wg.Done() 
     activity, err := extractDetail(url) 
     if err != nil { 
      log.Println(err) 
      return 
     } 
     queue <- activity 
    }(i, activityURL) 
} 
    // calling it like this without the goroutine causes the execution to hang 
// wg.Wait() 
// close(queue) 

    // calling it like this successfully waits 
go func() { 
    wg.Wait() 
    close(queue) 
}() 
for a := range queue { 
    // block channel until valid url is added to queue 
    // once all are added, close it 
    activities = append(activities, a) 
} 

爲什麼這些代碼並沒有達到close如果我不使用股份公司oroutine for wg.Wait()?我認爲所有的defer wg.Done()聲明都會被調用,所以最終它會清除,因爲它會到達wg.Wait()。它是否與在我的頻道中接收值有關?

+0

你能展示整個代碼嗎? –

+0

默認情況下,通道是無緩衝的,這意味着如果有相應的接收準備好接收發送的值,它們將只接受發送。因此,當您將活動發送到隊列時,沒有相應的接收,並且它被阻止。所有例程被阻止,最後wg.wait會掛起。有一個概念叫做緩衝通道,可以用來緩衝多個輸入。通過https://gobyexample.com/channel-buffering去瀏覽 –

回答

4

您需要等待goroutines完成一個單獨的線程,因爲queue需要讀取。當你做到以下幾點:

queue := make(chan Activity) 
for i, activityURL := range urls { 
    wg.Add(1) 
    go func(i int, url string) { 
     defer wg.Done() 
     activity, err := extractDetail(url) 
     if err != nil { 
      log.Println(err) 
      return 
     } 
     queue <- activity // nothing is reading data from queue. 
    }(i, activityURL) 
} 

wg.Wait() 
close(queue) 

for a := range queue { 
    activities = append(activities, a) 
} 

queue <- activity每個夠程塊,因爲queue是無緩衝,並沒有什麼從中讀取數據。這是因爲queue上的範圍循環在wg.Wait之後的主線程中。

wg.Wait只會在所有goroutine返回時解鎖。但是如前所述,所有的goroutines在頻道發送時都被阻止。

當您使用單獨的goroutine等待時,代碼執行實際上到達queue上的範圍循環。

// wg.Wait does not block the main thread. 
go func() { 
    wg.Wait() 
    close(queue) 
}() 

這導致了夠程在queue <- activity聲明解封(主線程開始讀取關閉queue)和運行直到完成。而後者又調用每個人wg.Done

一旦等待的例程通過wg.Wait,關閉queue,主線程退出範圍循環。

+0

鑑於你的解釋很有意義,但是隻有在閱讀之後,這讓我很想知道,我寫它的方式是寫它的最好方法嗎?我不打算按照這種方式工作,所以我擔心代碼有點複雜,而且不太易讀。 –

+0

這幾乎是如何處理這樣的問題。有些人更喜歡完全依賴渠道,但等待組織沒有任何問題。至於可讀性,這個問題可能與理解代碼流程有關。這是您在使用該語言時更熟悉的事情。 – abhink

1

queue通道沒有緩衝,所以每個試圖寫入它的goroutine都會被阻塞,因爲讀取器進程尚未啓動。所以沒有goroutinte可以寫,他們都掛起 - 結果wg.Wait永遠等待。 嘗試在一個單獨的goroutine推出讀者:

go func() { 
    for a := range queue { 
     // block channel until valid url is added to queue 
     // once all are added, close it 
     activities = append(activities, a) 
    } 
}() 

,然後開始服務員:

wg.Wait() 
close(queue) 

這樣你就不能積累在通道中的所有數據和過載,但得到的數據,因爲它來到目標切片。

+0

你的意思是先打電話給讀者,然後在沒有門廳的情況下給服務員打電話?我試過這個,它在我的slice中返回了3個結果,而不是預期的4.我懷疑它是在到達所有wg.Done()之後過早地關閉隊列並且之前返回slice最後一項可以追加。 –

+0

是的。在這種情況下,您需要等到閱讀器完成,否則最後的記錄可能會丟失。 –

相關問題