2015-12-22 184 views
0

我有一個大型的日誌文件,你想並行分析。如何等待執行

我有這樣的代碼:

package main 

import (
    "bufio" 
    "fmt" 
    "os" 
    "time" 
) 

func main() { 
    filename := "log.txt" 
    threads := 10 

    // Read the file 
    file, err := os.Open(filename) 
    if err != nil { 
     fmt.Println("Could not open file with the database.") 
     os.Exit(1) 
    } 
    defer file.Close() 

    scanner := bufio.NewScanner(file) 

    // Channel for strings 
    tasks := make(chan string) 

    // Run the threads that catch events from the channel and understand one line of the log file 
    for i := 0; i < threads; i++ { 
     go parseStrings(tasks) 
    } 

    // Start a thread load lines from a file into the channel 
    go getStrings(scanner, tasks) 

    // At this point I have to wait until all of the threads executed 
    // For example, I set the sleep 
    for { 
     time.Sleep(1 * time.Second) 
    } 
} 

func getStrings(scanner *bufio.Scanner, tasks chan<- string) { 
    for scanner.Scan() { 
     s := scanner.Text() 
     tasks <- s 
    } 
} 

func parseStrings(tasks <-chan string) { 
    for { 
     s := <-tasks 
     event := parseLine(s) 
     fmt.Println(event) 
    } 
} 

func parseLine(line string) string { 
    return line 
} 

其實,我等待所有線程結束? 我被建議創建一個單獨的線程,我將添加結果,但如何添加?

+2

https://golang.org/pkg/sync/#WaitGroup? –

+0

@ Ainar-G我不明白如何正確添加 –

回答

1

使用管道模式,而 「扇出/扇」 的格局:

package main 

import (
    "bufio" 
    "fmt" 
    "strings" 
    "sync" 
) 

func main() { 
    file := "here is first line\n" + 
     "here is second line\n" + 
     "here is line 3\n" + 
     "here is line 4\n" + 
     "here is line 5\n" + 
     "here is line 6\n" + 
     "here is line 7\n" 
    scanner := bufio.NewScanner(strings.NewReader(file)) 

    // all lines onto one channel 
    in := getStrings(scanner) 

    // FAN OUT 
    // Multiple functions reading from the same channel until that channel is closed 
    // Distribute work across multiple functions (ten goroutines) that all read from in. 
    xc := fanOut(in, 10) 

    // FAN IN 
    // multiplex multiple channels onto a single channel 
    // merge the channels from c0 through c9 onto a single channel 
    for n := range merge(xc) { 
     fmt.Println(n) 
    } 
} 

func getStrings(scanner *bufio.Scanner) <-chan string { 
    out := make(chan string) 
    go func() { 
     for scanner.Scan() { 
      out <- scanner.Text() 
     } 
     close(out) 
    }() 
    return out 
} 

func fanOut(in <-chan string, n int) []<-chan string { 
    var xc []<-chan string 
    for i := 0; i < n; i++ { 
     xc = append(xc, parseStrings(in)) 
    } 
    return xc 
} 

func parseStrings(in <-chan string) <-chan string { 
    out := make(chan string) 
    go func() { 
     for n := range in { 
      out <- parseLine(n) 
     } 
     close(out) 
    }() 
    return out 
} 

func parseLine(line string) string { 
    return line 
} 

func merge(cs []<-chan string) <-chan string { 
    var wg sync.WaitGroup 
    wg.Add(len(cs)) 

    out := make(chan string) 
    for _, c := range cs { 
     go func(c <-chan string) { 
      for n := range c { 
       out <- n 
      } 
      wg.Done() 
     }(c) 
    } 

    go func() { 
     wg.Wait() 
     close(out) 
    }() 
    return out 
} 

Check it out on the playground

1

VAR WG sync.WaitGroup

開始時每個夠程做:

wg.Add(1) 

當夠程完成工作遞減計數器,

wg.Done() 

結果,而不是

for { 
    time.Sleep(1 * time.Second) 
} 

wg.Wait() 
1

只需使用sync.WaitGroup

package main 

import(
    "sync" 
) 

func stuff(wg *sync.WaitGroup) { 
    defer wg.Done() // tell the WaitGroup it's done 
    /* stuff */ 
} 

func main() { 
    count := 50 
    wg := new(sync.WaitGroup) 
    wg.Add(count) // add number of gorutines to the WaitGroup 
    for i := 0; i < count; i++ { 
     go stuff(wg) 
    } 
    wg.Wait() // wait for all 
} 

Play