0
我最近在玩Go,想出了一個小腳本來解析日誌文件並將它們插入到彈性搜索中。對於每一個文件我產生了這樣一個夠程:併發文件解析並插入到彈性搜索中
var wg := sync.WaitGroup{}
wg.Add(len(files))
for _, file := range files {
go func(f os.FileInfo){
defer wg.Done()
ProcessFile(f.Name(), config.OriginFilePath, config.WorkingFilePath, config.ArchiveFilePath,fmt.Sprintf("http://%v:%v", config.ElasticSearch.Host, config.ElasticSearch.Port),config.ProviderIndex, config.NetworkData)
}(file)
}
wg.Wait()
裏面我processFile我有功能,發送到彈性搜索:
func BulkInsert(lines []string, ES *elastic.Client) (*elastic.Response, error){
r, err := ES.PerformRequest("POST", "/_bulk", url.Values{}, strings.Join(lines, "\n")+"\n")
if err != nil {
return nil, err
}
return r, nil
}
的問題是,我不完全瞭解工作夠程。我的理解是發送到彈性搜索會阻止我的一個goroutines執行。我試圖產卵另一個夠程爲與相同的方式批量插入彈性搜索:
WaitGroup
,go func(){defer wg.Done(); BulkInsert(elems, ES);}()
和wg.Wait()
我之前函數返回。然而,我發現最後並不是所有的事件都以彈性搜索結束。我認爲這是由於goroutines沒有發送/等待批量請求完成而返回。
我的問題是,我的方法是否正確?我能取得更好的表現嗎?
感謝例子。據我瞭解,我需要第一個例子:我產生儘可能多的goroutines(根據長度'files'列表長度),並使用相同的方法:'wg.Add(1)'和'wg.Wait()'。 但是我還是不明白,爲什麼彈性搜索批量插入函數的inner'wg'不會阻塞外部goroutine。我需要通過引用傳遞'wg'以等待'BulkInsert'函數返回? – Disciples
'wg'不能被複制,如果需要傳遞它,請使用指針。 wg不是一個通道,它不組織併發,它只會告訴它什麼時候完成,它只會在Wait()調用時阻塞。在沒有閱讀的情況下,您無法真正瞭解您的特定批量代碼。 –