2010-06-07 18 views
3

對於我的項目的某些部分,我需要一個流程本地調度系統,它允許我在幾秒鐘內延遲方法執行。我有成千上萬的這個系統的「客戶端」,所以每個延遲使用threading.Timer是一個壞主意,因爲我會很快達到操作系統線程限制。我的implemented系統只使用一個線程進行時序控制。Python中的快速方法調用調度

主要想法是保持排序的任務(時間+ func + args + kwargs)隊列,並使用單個threading.Timer來調度/取消該隊列頭部的執行。這個方案很有效,但我對錶演不滿意。 〜大約2000個客戶端每10秒計劃虛擬任務會導致進程花費40%的CPU時間。縱觀剖析器輸出,我發現所有時間都花在新的構建,開始,特別是創建新線程上。

我相信還有更好的辦法。現在我考慮重寫LightTimer,以便將有一個執行線程可由threading.Event控制,並有多個定時線程可用於事件。例如:

  • 我在10秒內安排了一個任務調用。該任務被添加到隊列中。計時線程#1在event.set()
  • 之前開始time.sleep(10)然後我安排一個任務在11秒內打電話。該任務被添加到隊列中。計時線程沒有任何反應,它會在醒來後注意到新的任務。
  • 然後我安排一個任務在5秒內致電。該任務被添加到隊列中。定時線程#2開始time.sleep(5),因爲#1已經睡了一段較長的時間間隔。

我希望你已經發現了這個想法。你怎麼看待這種方式?有沒有更好的辦法?也許我可以利用一些linux系統功能來制定最佳解決方案?

回答

2

您可以使用的另一種實現方式是使用time.time()方法來計算每個排隊函數應該執行的絕對時間。將此時間和要調用的函數放置在一個對象包裝器中,該對象包裝器使用執行時間來覆蓋比較運算符以確定順序。然後使用heapq模塊來維護最小堆。這將爲您提供一個高效的數據結構,其中堆的元素0始終是您的下一個事件。

實現實際調用的一種方法是使用單獨的線程來執行回調。堆需要用互斥鎖保護,您可以使用條件變量來實現計劃。在無限循環中,只需查找下一次執行函數(堆的元素0)並使用條件變量的wait()方法,並將超時設置爲下一個執行時間。如果新插入的函數應該在堆中最早的函數之前發生,那麼您的堆插入方法可以使用條件變量的notify()方法提前喚醒調度線程。

+0

嗯......非常有趣,謝謝!特別是我從來沒有看過'heapq',也沒有考慮過使用'threading.Condition'來達到這個目的。 – nkrkv 2010-06-07 15:49:25

2

你看過Python標準庫中的sched模塊嗎?在專用線程上運行調度程序(並且讓所有調度的操作「將綁定方法及其參數放入隊列中」,池中的線程將從中剝離並執行它 - 正如我在線程的Nutshell章節中所寫的那樣,除非那種情況下沒有調度)應該做你想做的事情。

+0

以及如何處理文檔中的一條語句*「調度程序類在線程安全性方面存在限制,無法在正在運行的調度程序中當前未決的新任務之前插入新任務」*?這是否意味着我可以在一個當前未決的,但來自同一個線程之前插入一個新的任務,或者我必須產生多個運行sched的線程? – nkrkv 2010-06-07 15:42:11

+0

@nailxx,它確實需要一些注意 - 該警告涉及到「天真」的用法,比如使用「normal」delayfunc(只是time.sleep)或從多個線程執行調度程序的方法。使用delayfunc在專用隊列上執行Queue.get超時(一旦任何事情被推入隊列,就會被喚醒......並在需要時將其拉出並執行調度方法調用) - 然後從其他線程推送到該專用隊列代替從它們調用調度程序方法 - 我已成功在類似的多線程場景中使用單個調度程序實例。 – 2010-06-07 18:23:45

0

您不可能通過「幾千客戶端」達到操作系統線程限制;儘管所有這些線程的堆棧可能會消耗大量不必要的內存。

看看扭曲了什麼,它允許一個進程以一種證明可以很好地處理大量事件的方式複用很多事件(包括定時器)。

您還可以結合事件驅動和多進程模型,每臺機器運行多個進程並在每個進程中執行事件驅動的邏輯 - 比如說一個進程可以處理2,000個客戶端,您仍然可以運行30個進程是足夠的整體資源)並且獲得更好的吞吐量,特別是在現代多核硬件上。

+0

*您不可能達到操作系統線程限制* - 我確實是這麼做的:) – nkrkv 2010-06-08 07:13:05

+0

我希望您能在所有這些堆棧中用盡地址空間。 Linux默認堆棧大小通常爲1M(或類似),因此只需要幾千個線程即可在32位進程中使用A/S。操作系統有更高的限制。 – MarkR 2010-06-08 08:29:12