2017-09-13 77 views
0

我最近在玩Go,想出了一個小腳本來解析日誌文件並將它們插入到彈性搜索中。對於每一個文件我產生了這樣一個夠程:併發文件解析並插入到彈性搜索中

var wg := sync.WaitGroup{} 
wg.Add(len(files)) 
for _, file := range files { 
    go func(f os.FileInfo){ 
     defer wg.Done() 
     ProcessFile(f.Name(), config.OriginFilePath, config.WorkingFilePath, config.ArchiveFilePath,fmt.Sprintf("http://%v:%v", config.ElasticSearch.Host, config.ElasticSearch.Port),config.ProviderIndex, config.NetworkData) 
    }(file) 
} 
wg.Wait() 

裏面我processFile我有功能,發送到彈性搜索:

func BulkInsert(lines []string, ES *elastic.Client) (*elastic.Response, error){ 
    r, err := ES.PerformRequest("POST", "/_bulk", url.Values{}, strings.Join(lines, "\n")+"\n") 
    if err != nil { 
     return nil, err 
    } 
    return r, nil 
} 

的問題是,我不完全瞭解工作夠程。我的理解是發送到彈性搜索會阻止我的一個goroutines執行。我試圖產卵另一個夠程爲與相同的方式批量插入彈性搜索:

WaitGroupgo func(){defer wg.Done(); BulkInsert(elems, ES);}()wg.Wait()我之前函數返回。然而,我發現最後並不是所有的事件都以彈性搜索結束。我認爲這是由於goroutines沒有發送/等待批量請求完成而返回。

我的問題是,我的方法是否正確?我能取得更好的表現嗎?

回答

1

我可以取得更好的效果嗎?

不清楚,它取決於接收方和發送方的能力。

我的問題是,我的方法是否正確?

這可能會幫助你更好的理解去例程,

package main 

import (
    "fmt" 
    "log" 
    "net/http" 
    "sync" 
    "time" 
) 

func main() { 

    addr := "127.0.0.1:2074" 

    srv := http.Server{ 
     Addr: addr, 
     Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 
      log.Println("hit ", r.URL.String()) 
      <-time.After(time.Second) 
      log.Println("done ", r.URL.String()) 
     }), 
    } 
    fail(unblock(srv.ListenAndServe)) 

    jobs := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9} 

    // case 1 
    // it creates 10 goroutines, 
    // that triggers 10 // concurrent get queries 
    { 
     wg := sync.WaitGroup{} 
     wg.Add(len(jobs)) 
     log.Printf("starting %v jobs\n", len(jobs)) 
     for _, job := range jobs { 
      go func(job int) { 
       defer wg.Done() 
       http.Get(fmt.Sprintf("http://%v/job/%v", addr, job)) 
      }(job) 
     } 
     wg.Wait() 
     log.Printf("done %v jobs\n", len(jobs)) 
    } 

    log.Println() 
    log.Println("=================") 
    log.Println() 

    // case 2 
    // it creates 3 goroutines, 
    // that triggers 3 // concurrent get queries 
    { 
     wg := sync.WaitGroup{} 
     wg.Add(len(jobs)) 
     in := make(chan string) 
     limit := make(chan bool, 3) 
     log.Printf("starting %v jobs\n", len(jobs)) 
     go func() { 
      for url := range in { 
       limit <- true 
       go func(url string) { 
        defer wg.Done() 
        http.Get(url) 
        <-limit 
       }(url) 
      } 
     }() 
     for _, job := range jobs { 
      in <- fmt.Sprintf("http://%v/job/%v", addr, job) 
     } 
     wg.Wait() 
     log.Printf("done %v jobs\n", len(jobs)) 
    } 

    log.Println() 
    log.Println("=================") 
    log.Println() 

    // case 2: rewrite 
    // it creates 6 goroutines, 
    // that triggers 6 // concurrent get queries 
    { 
     wait, add := parallel(6) 
     log.Printf("starting %v jobs\n", len(jobs)) 
     for _, job := range jobs { 
      url := fmt.Sprintf("http://%v/job/%v", addr, job) 
      add(func() { 
       http.Get(url) 
      }) 
     } 
     wait() 
     log.Printf("done %v jobs\n", len(jobs)) 
    } 
} 

func parallel(c int) (func(), func(block func())) { 
    wg := sync.WaitGroup{} 
    in := make(chan func()) 
    limit := make(chan bool, c) 
    go func() { 
     for block := range in { 
      limit <- true 
      go func(block func()) { 
       defer wg.Done() 
       block() 
       <-limit 
      }(block) 
     } 
    }() 
    return wg.Wait, func(block func()) { 
     wg.Add(1) 
     in <- block 
    } 
} 

func unblock(block func() error) error { 
    w := make(chan error) 
    go func() { w <- block() }() 
    select { 
    case err := <-w: 
     return err 
    case <-time.After(time.Millisecond): 
    } 
    return nil 
} 

func fail(err error) { 
    if err != nil { 
     panic(err) 
    } 
} 

輸出

$ go run main.go 
2017/09/14 01:30:50 starting 10 jobs 
2017/09/14 01:30:50 hit /job/0 
2017/09/14 01:30:50 hit /job/4 
2017/09/14 01:30:50 hit /job/5 
2017/09/14 01:30:50 hit /job/2 
2017/09/14 01:30:50 hit /job/9 
2017/09/14 01:30:50 hit /job/1 
2017/09/14 01:30:50 hit /job/3 
2017/09/14 01:30:50 hit /job/7 
2017/09/14 01:30:50 hit /job/8 
2017/09/14 01:30:50 hit /job/6 
2017/09/14 01:30:51 done /job/5 
2017/09/14 01:30:51 done /job/4 
2017/09/14 01:30:51 done /job/2 
2017/09/14 01:30:51 done /job/0 
2017/09/14 01:30:51 done /job/6 
2017/09/14 01:30:51 done /job/9 
2017/09/14 01:30:51 done /job/1 
2017/09/14 01:30:51 done /job/3 
2017/09/14 01:30:51 done /job/7 
2017/09/14 01:30:51 done /job/8 
2017/09/14 01:30:51 done 10 jobs 
2017/09/14 01:30:51 
2017/09/14 01:30:51 ================= 
2017/09/14 01:30:51 
2017/09/14 01:30:51 starting 10 jobs 
2017/09/14 01:30:51 hit /job/0 
2017/09/14 01:30:51 hit /job/2 
2017/09/14 01:30:51 hit /job/1 
2017/09/14 01:30:52 done /job/2 
2017/09/14 01:30:52 done /job/0 
2017/09/14 01:30:52 done /job/1 
2017/09/14 01:30:52 hit /job/3 
2017/09/14 01:30:52 hit /job/4 
2017/09/14 01:30:52 hit /job/5 
2017/09/14 01:30:53 done /job/3 
2017/09/14 01:30:53 done /job/4 
2017/09/14 01:30:53 done /job/5 
2017/09/14 01:30:53 hit /job/6 
2017/09/14 01:30:53 hit /job/7 
2017/09/14 01:30:53 hit /job/8 
2017/09/14 01:30:54 done /job/6 
2017/09/14 01:30:54 done /job/7 
2017/09/14 01:30:54 done /job/8 
2017/09/14 01:30:54 hit /job/9 
2017/09/14 01:30:55 done /job/9 
2017/09/14 01:30:55 done 10 jobs 
2017/09/14 01:30:55 
2017/09/14 01:30:55 ================= 
2017/09/14 01:30:55 
2017/09/14 01:30:55 starting 10 jobs 
2017/09/14 01:30:55 hit /job/0 
2017/09/14 01:30:55 hit /job/1 
2017/09/14 01:30:55 hit /job/4 
2017/09/14 01:30:55 hit /job/2 
2017/09/14 01:30:55 hit /job/3 
2017/09/14 01:30:55 hit /job/5 
2017/09/14 01:30:56 done /job/0 
2017/09/14 01:30:56 hit /job/6 
2017/09/14 01:30:56 done /job/1 
2017/09/14 01:30:56 done /job/2 
2017/09/14 01:30:56 done /job/4 
2017/09/14 01:30:56 hit /job/7 
2017/09/14 01:30:56 done /job/3 
2017/09/14 01:30:56 hit /job/9 
2017/09/14 01:30:56 hit /job/8 
2017/09/14 01:30:56 done /job/5 
2017/09/14 01:30:57 done /job/6 
2017/09/14 01:30:57 done /job/7 
2017/09/14 01:30:57 done /job/9 
2017/09/14 01:30:57 done /job/8 
2017/09/14 01:30:57 done 10 jobs 
+0

感謝例子。據我瞭解,我需要第一個例子:我產生儘可能多的goroutines(根據長度'files'列表長度),並使用相同的方法:'wg.Add(1)'和'wg.Wait()'。 但是我還是不明白,爲什麼彈性搜索批量插入函數的inner'wg'不會阻塞外部goroutine。我需要通過引用傳遞'wg'以等待'BulkInsert'函數返回? – Disciples

+0

'wg'不能被複制,如果需要傳遞它,請使用指針。 wg不是一個通道,它不組織併發,它只會告訴它什麼時候完成,它只會在Wait()調用時阻塞。在沒有閱讀的情況下,您無法真正瞭解您的特定批量代碼。 –