2015-12-20 39 views
2

我有5個巨大的(每個400萬行)日誌文件,我目前在Perl中處理,我想我可能會嘗試在Go中執行同樣的功能。所以,在Go中非常缺乏經驗,我正在考慮如下操作。任何意見的方法將不勝感激。 一些粗糙的僞代碼:Golang:同時處理5個大文件

var wg1 sync.WaitGroup 
var wg2 sync.WaitGroup 

func processRow (r Row) { 
    wg2.Add(1) 
    defer wg2.Done() 
    res = <process r> 
    return res 
} 

func processFile(f File) { 
    wg1.Add(1) 
    open(newfile File) 
    defer wg1.Done() 
    line = <row from f> 
    result = go processRow(line) 
    newFile.Println(result) // Write new processed line to newFile 
    wg2.Wait() 
    newFile.Close() 

} 

func main() { 

    for each f logfile { 
     go processFile(f) 
    } 
    wg1.Wait() 
} 

所以,想法是,我同時,然後將每個文件中的所有行會反過來也可以同時處理處理這5個文件。

這會工作嗎?

+3

對於I/O綁定任務,您可能無法從CPU併發性中獲得太多。你也可能想看看頻道。 – pvg

+1

「過程r」是什麼?如果它是相對「容易」的東西,那麼可能不值得在單獨的gorutine中進行 - 開銷大於增益。另外,結果是,你會有一個「日誌數據集」(即所有5個文件合併爲一個)還是會有5個不同的結果集? – ain

+0

@ain我的理解是它必須是非常實體的。 goroutines的開銷很低。 –

回答

6

您一定要使用渠道來管理您處理的行。或者,你也可以寫另一個goroutine來處理你的輸出。

var numGoWriters = 10 

func processRow(r Row, ch chan<- string) { 
    res := process(r) 
    ch <- res 
} 

func writeRow(f File, ch <-chan string) { 
    w := bufio.NewWriter(f) 
    for s := range ch { 
     _, err := w.WriteString(s + "\n") 
    } 

func processFile(f File) { 
    outFile, err := os.Create("/path/to/file.out") 
    if err != nil { 
     // handle it 
    } 
    defer outFile.Close() 
    var wg sync.WaitGroup 
    ch := make(chan string, 10) // play with this number for performance 
    defer close(ch) // once we're done processing rows, we close the channel 
        // so our worker threads exit 
    fScanner := bufio.NewScanner(f) 
    for fScanner.Scan() { 
     wg.Add(1) 
     go func() { 
      processRow(fScanner.Text(), ch) 
      wg.Done() 
     }() 
    } 
    for i := 0; i < numGoWriters; i++ { 
     go writeRow(outFile, ch) 
    } 
    wg.Wait() 
} 

下面我們就processRow做所有的處理(我假設爲string),writeRow做所有的出I/O,以及processFile搭售每個文件一起。然後,所有main所要做的就是交出文件,產生公用程序,等瞧

func main() { 
    var wg sync.WaitGroup 

    filenames := [...]string{"here", "are", "some", "log", "paths"} 
    for fname := range filenames { 
     inFile, err := os.Open(fname) 
     if err != nil { 
      // handle it 
     } 
     defer inFile.Close() 
     wg.Add(1) 
     go processFile(inFile) 
    } 
    wg.Wait() 
+0

非常感謝您的支持! – Kosie