2016-12-27 27 views
3

我該如何確定走頻道的哪一邊在另一邊等待?如何確定走頻道的哪一邊正在等待?

我想知道這個,所以我可以找出我的處理正在受到限制,並通過分配更多資源進行響應。

一些選項

2種方法我認爲雙方都需要的東西所以測量不太吵做記錄值的移動平均值,但是這不是一個bigproblem


  1. 使用一個定時器來檢查的時間%的消費者

等待在單個消費者的情況下,我可以從通道消耗後,我停止計時器之前啓動一個定時器得到一個記錄。我可以跟蹤每個獲取週期中等待和響應的時間百分比。

緩衝信道的
  • 試樣長度
  • 如果信道經常是0,這意味着我們正在消耗比發送速度更快。同樣,如果緩衝區已滿,我們發送速度比我們可以接收的速度快。我們可以隨時檢查我們頻道的長度,以確定運行緩慢的情況。


    出於性能原因或其他原因,是否有充分的理由選擇其中一個到另一個?有一個更簡單的解決方案來解決這個問題嗎?

    我正在執行N HTTP請求抓住在同一時間在多達W夠程內容,並在單個的goroutine運行發送所有內容的信道到一個processor,服務,其在將數據反饋給客戶端。

    每個工作任務將導致通道上發送大量消息。每個worker的任務可能需要幾分鐘才能完成。

    下圖總結了3名併發工作人員(W=3)的數據流。

    [worker: task 1] - 
             \ 
        [worker: task 2] - | --- [ channel ] --- [ processor ] -> [ client ] 
            /
        [worker: task 3] - 
    

    我想知道我是否應該請求過程中運行更多的工人(增加W)或更少的工人(減少W)。由於客戶端通過非常不同的速度進行連接,因此每個請求可能會有所不同。

    +0

    您是否嘗試過僅運行阻止配置文件? – JimB

    +0

    你可以使用'len()'和'cap()'作爲頻道 –

    +0

    @JimB我不清楚你的意思是「阻止配置文件」。 – turtlemonvh

    回答

    3

    達到目標的一種方法是使用「有限發送」和「有限接收」操作 - 如果您能夠提出合理的輪詢超時時間。

    當你的任何一個工作人員試圖通過通道發送一個完整的結果時,不要讓它永遠阻塞(直到通道緩衝區中有空間);相反,只允許它阻止一些最長時間。如果超時發生在通道緩衝區中有空間之前,您可以對該情況做出反應:計算髮生次數,調整未來最後期限,節流或減少工作人員數量等等。

    同樣,對於從處理器接收結果的「處理器」,您可以限制其阻塞的時間量。如果超時發生在有可用值之前,則處理器將停止運行。創造更多的工作人員來更快地餵食(假設工人將從這種並行性中受益)。

    這種方法的缺點是每個sendreceive operation的開銷creating timers

    草圖,這些聲明可以訪問到每個工人:

    const minWorkers = 3 
    var workers uint32 
    

    在每個工人夠程:

    atomic.AddUint32(&workers, 1) 
    for { 
        result, ok := produce() 
        if !ok { 
         break 
        } 
        // Detect when channel "p"'s buffer is full. 
        select { 
        case p <- result: 
        case <-time.After(500 * time.Millisecond): 
         // Hand over the pending result, no matter how long it takes. 
         p <- result 
         // Reduce worker count if above minimum. 
         if current := atomic.LoadUint32(&workers); current > minWorkers && 
          atomic.CompareAndSwapUint32(&workers, current, current-1) { 
          return 
         } 
         // Consider whether to try decrementing the working count again 
         // if we're still above the minimum. It's possible another one 
         // of the workers also exited voluntarily, changing the count. 
        } 
    } 
    atomic.AddUint32(&workers, -1) 
    

    注意,按照上面的方法,你可以實現通過定時如何同樣的效果長時間需要發送到通道p來完成,並且對其做出反應花費了太長的時間,而不是進行一個有界發送,然後是潛在的阻塞發送。不過,我這樣描繪了它,因爲我懷疑這樣的代碼會成熟到在超時到期時包含日誌和儀器計數器顛簸。

    同樣,在你的處理器的goroutine,你可以限制你阻止工人接收值的時間量:

    for { 
        select { 
        case result <- p: 
         consume(result) 
        case <-time.After(500 * time.Millisecond): 
         maybeStartAnotherWorker() 
        } 
    } 
    

    顯然,有很多的旋鈕可以附加到這個玩意兒。你最終將生產者的安排與消費者和生產者本身聯繫起來。引入生產者和消費者「抱怨」延遲的不透明「聽衆」,可以讓您打破這種循環關係,更輕鬆地改變管理您對擁堵的反應的政策。

    +0

    對不起,延遲迴復。這將適用於通道使用的超時始終小於從通道讀取的延遲時間的情況。如果我們想捕獲負載的週期性波動,我們可以依靠[nyqist](https://en.wikipedia.org/wiki/Nyquist%E2%80%93Shannon_sampling_theorem)告訴我們,我們的採樣率需要非常高。在這種情況下,我們將把一個離散的數據流解釋爲一個連續的數據流,然後再次離散化爲樣本。 – turtlemonvh

    +0

    當然,我所說的獲得頻道的長度也有同樣的問題。您的解決方案意味着我們不需要使用緩衝通道來獲得準確的測量結果(並且所有事情都有更多的事件驅動);我需要一個後臺進程來完成抽樣,但是沒有定時器的開銷。您的計時器可能需要與發送到通道之間的估計時間綁定,這在某些情況下可能很難計算。它可能不是我所需要的最佳解決方案,但它絕對是一個很酷的主意 - 謝謝! – turtlemonvh