2017-02-23 92 views
1

我們有一堆文件要處理後上傳到遠程blob存儲。去隊列處理失敗後重試

目前,前端(PHP)創建了一個這樣的文件的redis列表,並給它一個唯一的ID,稱爲JobID。然後它將這個唯一的ID傳遞給一個豆杆管,這個管通過一個Go進程接收。它使用名爲Go workers的庫來按照net/http所做的方式處理每個作業ID。它接收作業ID,檢索redis列表並開始處理文件。

但是,目前一次只能處理一個文件。由於這裏的操作是I/O綁定,而不是CPU綁定,直覺表明,使用每個文件的goroutine會有好處。

但是,我們希望在失敗時重試上傳,並跟蹤每個作業處理的項目數量。我們無法啓動一個未綁定的goroutine數量,因爲一個Job可以包含大約10k個文件來處理,並且在高峯時間內每秒可以發送100個這樣的作業。這將是什麼正確的方法?

NB:我們可以改變技術,如果需要的堆棧中的位(如換掉的東西beanstalkd)

回答

2

您可以通過使用一個緩衝chan的大小夠程的最大數量的限制夠程的數量你要。如果達到最大容量,則可以阻止此chan。隨着您的goroutines完成,他們將釋放插槽以允許新goroutines運行。

例子:

package main 

import (
    "fmt" 
    "sync" 
) 

var (
    concurrent = 5 
    semaphoreChan = make(chan struct{}, concurrent) 
) 

func doWork(wg *sync.WaitGroup, item int) { 
    // block while full 
    semaphoreChan <- struct{}{} 

    go func() { 
     defer func() { 
      // read to release a slot 
      <-semaphoreChan 
      wg.Done() 
     }() 
     // This is where your work actually gets done 
     fmt.Println(item) 
    }() 
} 

func main() { 
    // we need this for the example so that we can block until all goroutines finish 
    var wg sync.WaitGroup 
    wg.Add(10) 

    // start the work 
    for i := 0; i < 10; i++ { 
     doWork(&wg, i) 
    } 

    // block until all work is done 
    wg.Wait() 
} 

去遊樂場鏈接:https://play.golang.org/p/jDMYuCe7HV

本Golang英國會議的啓發談話:https://youtu.be/yeetIgNeIkc?t=1413

+0

它幫助我開始使用限制併發性。然而,現在仍然存在的問題是如何跟蹤工作的成敗。作業包含N個子任務,所有這些都必須成功處理,否則需要報告錯誤。我如何解決這個問題? – agathver

+0

創建一個您傳遞給goroutine的頻道。該goroutine可以將操作的結果寫入該通道,包括錯誤。調用者可以根據需要從該通道中獲取信息來處理錯誤(例如記錄錯誤或重試操作)。如果您需要重試該操作,請使用具有必要上下文的自定義結構類型來重試該通道(例如,goroutine需要再次嘗試的輸入)並出現錯誤。 – MahlerFive