2014-10-30 37 views
3

我對Go非常陌生,正在尋找一種方法來處理使用100個工作人員的3000個查詢,並確保每個工作人員的連接(MySQL已經配置了超過100個連接)。這是我的嘗試:處理查詢的工作池

package main 

import (
    "database/sql" 
    _ "github.com/go-sql-driver/mysql" 
) 

var query *sql.Stmt 

func worker(jobs <-chan int, results chan<- int) { 

    for _ = range jobs { 

     _, e := query.Exec("a") 

     if e != nil { 

      panic(e.Error()) 
     } 

     results <- 1 
    } 
} 

func main() { 

    workers := 100 

    db, e := sql.Open("mysql", "foo:[email protected]/foo") 

    if e != nil { 

     panic(e.Error()) 
    } 

    db.SetMaxOpenConns(workers) 
    db.SetMaxIdleConns(workers) 

    defer db.Close() 

    query, e = db.Prepare("INSERT INTO foo (foo) values(?)") 

    if e != nil { 

     panic(e.Error()) 
    } 

    total := 30000 
    jobs := make(chan int, total) 
    results := make(chan int, total) 

    for w := 0; w < workers; w++ { 

     go worker(jobs, results) 
    } 

    for j := 0; j < total; j++ { 

     jobs <- j 
    } 

    close(jobs) 

    for r := 0; r < total; r++ { 

     <-results 
    } 
} 

它的工作,但我不知道如果是這樣做的最佳方式。

請,如果您認爲這是基於意見或根本不是一個好問題,請將其標記爲關閉並留下評論以解釋原因。

+0

[Rob Pike的「併發性不是並行性」](http://vimeo.com/49718712)從第19分鐘開始 - 「一個非常簡單的負載平衡器」,它可以在一些工作人員之間分配工作。 – thwd 2014-10-30 16:37:43

+1

@tomwilde,當然,我正在閱讀http://talks.golang.org/2012/waza.slide#40謝謝! – coma 2014-10-30 16:45:02

+0

@tomwilde,在這個例子中,Rob並沒有緩衝頻道,但是如果我不使用任務總數來緩衝頻道,那麼我只能獲得工作完成後的工作人員數量。 – coma 2014-10-30 17:10:50

回答

1

你有什麼根本的工作,但爲了擺脫緩衝,你需要寫信給jobsresults在同一時間讀。否則,您的流程結束了 - 工作人員無法發送結果,因爲沒有收到結果,並且由於工作人員被阻止,您無法插入作業。

這裏有一個如何爲它接收main結果做推動工作在後臺的工作隊列a boiled-down example on the Playground

package main 

import "fmt" 

func worker(jobs <-chan int, results chan<- int) { 
    for _ = range jobs { 
     // ...do work here... 
     results <- 1 
    } 
} 

func main() { 
    workers := 10 
    total := 30 
    jobs := make(chan int) 
    results := make(chan int) 

    // start workers 
    for w := 0; w < workers; w++ { 
     go worker(jobs, results) 
    } 

    // insert jobs in background 
    go func() { 
     for j := 0; j < total; j++ { 
      jobs <- j 
     } 
    }() 

    // collect results 
    for i := 0; i < total; i++ { 
     <-results 
     fmt.Printf(".") 
    } 

    close(jobs) 
} 

對於該特定代碼的工作,你必須知道你有多少結果會得到。如果你不知道(比如,每個作業可能產生零個或多個結果),你可以使用一個sync.WaitGroupwait for the workers to finish, then close the result stream

package main 

import (
    "fmt" 
    "sync" 
) 

func worker(jobs <-chan int, results chan<- int, wg *sync.WaitGroup) { 
    for _ = range jobs { 
     // ...do work here... 
     results <- 1 
    } 
    wg.Done() 
} 

func main() { 
    workers := 10 
    total := 30 
    jobs := make(chan int) 
    results := make(chan int) 
    wg := &sync.WaitGroup{} 

    // start workers 
    for w := 0; w < workers; w++ { 
     wg.Add(1) 
     go worker(jobs, results, wg) 
    } 

    // insert jobs in background 
    go func() { 
     for j := 0; j < total; j++ { 
      jobs <- j 
     } 
     close(jobs) 
     wg.Wait() 
     // all workers are done so no more results 
     close(results) 
    }() 

    // collect results 
    for _ = range results { 
     fmt.Printf(".") 
    } 

} 

還有很多其他更復雜的花樣也行後,停止所有工作人員發生錯誤,將結果與原始作業的順序相同,或執行其他操作。聽起來好像基本版本在這裏工作,但。