2017-10-18 158 views
-1

我試圖建立一個系統,工作池/ jobqueue,以儘可能多地在每個API端點上處理http requests。我看着這example,並得到它的工作很好,除了我偶然發現的問題,我不明白如何將pool/jobqueue擴大到不同的端點。Golang HTTP請求工作池

對於方案的緣故,讓我們勾畫有跨越不同的端點和請求類型一百萬請求/ min的Golang http服務器GET & POST ETC.

我該如何擴展這個概念?我應該爲每個端點創建不同的工作池和作業。或者我可以創建不同的作業並將它們輸入到同一個隊列中,並使用相同的池來處理這些作業?

我想保持簡單性,如果我創建一個新的API端點,我不必創建新的工作池,所以我可以專注於API。但表現也非常重要。

我試圖構建的代碼取自之前鏈接的示例,here是其他人使用此代碼的github'gist'。

+2

Go的http包爲每個傳入連接啓動一個go例程。除非你在談論後臺工作處理,否則這似乎是浪費精力。 – squiguy

+0

是的,這是爲了後臺處理。有些人可能需要一段時間才能完成,我寧願不讓一個不受控制的goroutines寬鬆 –

+0

goroutines有什麼問題?它們基本上是內置異步支持的jobqueue實現。 –

回答

0

不清楚爲什麼你需要工作人員池?會不會是足夠的goroutines?

如果您受資源限制,可以考慮實施rates limiting。如果不是,爲什麼根本不需要跨越例程呢?

最好的學習方法是研究別人如何做好事。

看一看https://github.com/valyala/fasthttp

快速HTTP包圍棋。調整爲高性能。熱路徑中的零內存分配。比net/http快10倍。

他們都聲稱:

從每個物理服務器

這是相當可觀的,我懷疑你可以做超過1.5M併發保持連接服務多達200K RPS用pool/jobqueue更好。

1

前面有一件事:如果您正在運行HTTP服務器(無論如何都是Go的標準服務器),則無法停止並重新啓動服務器就無法控制goroutine的數量。每個請求至少啓動一個goroutine,並且你無能爲力。好消息是,這通常不是問題,因爲goroutine非常輕巧。然而,你想保持正在努力工作的goroutines的數量是完全合理的。

您可以將任何值放入通道中,包括函數。因此,如果目標是隻需要在http處理程序中編寫代碼,那麼應該關閉這些工作 - 工作人員不知道(或關心)他們正在處理的是什麼。

package main 

import (
    "encoding/json" 
    "io/ioutil" 
    "net/http" 
) 

var largePool chan func() 
var smallPool chan func() 

func main() { 
    // Start two different sized worker pools (e.g., for different workloads). 
    // Cancelation and graceful shutdown omited for brevity. 

    largePool = make(chan func(), 100) 
    smallPool = make(chan func(), 10) 

    for i := 0; i < 100; i++ { 
      go func() { 
        for f := range largePool { 
          f() 
        } 
      }() 
    } 

    for i := 0; i < 10; i++ { 
      go func() { 
        for f := range smallPool { 
          f() 
        } 
      }() 
    } 

    http.HandleFunc("/endpoint-1", handler1) 
    http.HandleFunc("/endpoint-2", handler2) // naming things is hard, okay? 

    http.ListenAndServe(":8080", nil) 
} 

func handler1(w http.ResponseWriter, r *http.Request) { 
    // Imagine a JSON body containing a URL that we are expected to fetch. 
    // Light work that doesn't consume many of *our* resources and can be done 
    // in bulk, so we put in in the large pool. 
    var job struct{ URL string } 

    if err := json.NewDecoder(r.Body).Decode(&job); err != nil { 
      http.Error(w, err.Error(), http.StatusBadRequest) 
      return 
    } 

    go func() { 
      largePool <- func() { 
        http.Get(job.URL) 
        // Do something with the response 
      } 
    }() 

    w.WriteHeader(http.StatusAccepted) 
} 

func handler2(w http.ResponseWriter, r *http.Request) { 
    // The request body is an image that we want to do some fancy processing 
    // on. That's hard work; we don't want to do too many of them at once, so 
    // so we put those jobs in the small pool. 

    b, err := ioutil.ReadAll(r.Body) 
    if err != nil { 
      http.Error(w, err.Error(), http.StatusInternalServerError) 
      return 
    } 

    go func() { 
      smallPool <- func() { 
        processImage(b) 
      } 
    }() 
    w.WriteHeader(http.StatusAccepted) 
} 

func processImage(b []byte) {} 

這是一個非常簡單的例子來說明問題。設置工作池的方式並不重要。你只需要一個聰明的工作定義。在上面的例子中它是一個閉包,但是你也可以定義一個Job接口。現在

type Job interface { 
    Do() 
} 

var largePool chan Job 
var smallPool chan Job 

,我不會把整個工作池方法 「簡單」。你說你的目標是限制goroutines(正在工作)的數量。這根本不需要工人;它只需要一個限制器。這和上面的例子是一樣的,但是使用通道作爲信號來限制併發。

package main 

import (
    "encoding/json" 
    "io/ioutil" 
    "net/http" 
) 

var largePool chan struct{} 
var smallPool chan struct{} 

func main() { 
    largePool = make(chan struct{}, 100) 
    smallPool = make(chan struct{}, 10) 

    http.HandleFunc("/endpoint-1", handler1) 
    http.HandleFunc("/endpoint-2", handler2) 

    http.ListenAndServe(":8080", nil) 
} 

func handler1(w http.ResponseWriter, r *http.Request) { 
    var job struct{ URL string } 

    if err := json.NewDecoder(r.Body).Decode(&job); err != nil { 
      http.Error(w, err.Error(), http.StatusBadRequest) 
      return 
    } 

    go func() { 
      // Block until there are fewer than cap(largePool) light-work 
      // goroutines running. 
      largePool <- struct{}{} 
      defer func() { <-largePool }() // Let everyone that we are done 

      http.Get(job.URL) 
    }() 

    w.WriteHeader(http.StatusAccepted) 
} 

func handler2(w http.ResponseWriter, r *http.Request) { 
    b, err := ioutil.ReadAll(r.Body) 
    if err != nil { 
      http.Error(w, err.Error(), http.StatusInternalServerError) 
      return 
    } 

    go func() { 
      // Block until there are fewer than cap(smallPool) hard-work 
      // goroutines running. 
      smallPool <- struct{}{} 
      defer func() { <-smallPool }() // Let everyone that we are done 

      processImage(b) 
    }() 

    w.WriteHeader(http.StatusAccepted) 
} 

func processImage(b []byte) {} 
0

正如以前在服務器中回答的那樣,每個請求處理程序將至少在一個goroutine中運行。

但你仍然可以在必要時使用工人池後端並行任務。例如,讓我們假設一些HttpHandler函數觸發對其他外部API的調用並將它們的結果「彙總」在一起,因此在這種情況下調用的順序並不重要,這是一種可以利用工作池並分發爲了工作,讓他們並行調度每個任務運行,以一個工人的goroutine:

的示例代碼段:

// build empty response 
    capacity := config.GetIntProperty("defaultListCapacity") 
    list := model.NewResponseList(make([]model.Response, 0, capacity), 1, 1, 0) 

    // search providers 
    providers := getProvidersByCountry(country) 

    // create a slice of jobResult outputs 
    jobOutputs := make([]<-chan job.JobResult, 0) 

    // distribute work 
    for i := 0; i < len(providers); i++ { 
     job := search(providers[i], m) 
     if job != nil { 
      jobOutputs = append(jobOutputs, job.ReturnChannel) 
      // Push each job onto the queue. 
      GetInstance().JobQueue <- *job 
     } 
    } 

    // Consume the merged output from all jobs 
    out := job.Merge(jobOutputs...) 
    for r := range out { 
     if r.Error == nil { 
      mergeSearchResponse(list, r.Value.(*model.ResponseList)) 
     } 
    } 
    return list 

。職工池中運行的「通用」異步任務完成例如:https://github.com/guilhebl/go-offer/blob/master/offer/repo.go

。使用的工作者庫lib:https://github.com/guilhebl/go-worker-pool