2010-01-02 22 views
3

場景:我們有一個python腳本,可以同時檢查數千個proxys。 程序使用線程(每個代理1個)來加速進程。當它到達1007線程時,由於線程限制,腳本崩潰。
我的解決方案是:一個全局變量,當一個線程完成時線程產生和減少時遞增。產生線程的函數監視變量,以便達不到限制。
你的解決方案是什麼,朋友?繞過os線程限制的編程策略?

感謝您的答案。

回答

5

您想要使用select module執行非阻塞I/O。

有幾種不同的特定技術。 select.select應該適用於每個主要平臺。還有其他一些更高效的變體(如果您同時檢查數以萬計的連接,則可能很重要),但是您需要爲您的特定平臺編寫代碼。

+0

這可能非常好,我在「異步套接字編程」中看到了這種技術。可能比具有多個固定線程的解決方案好得多。 – 19021programmer 2010-01-02 22:43:11

3

Python是否具有任何類型的異步IO功能?這將是首選答案IMO - 爲每個出站連接產生一個額外的線程並不像有一個事件驅動的單線程那麼簡單。

+0

那麼你已經把我放在了一個很好的軌道上,謝謝。 – 19021programmer 2010-01-02 22:44:06

1

使用select模塊或類似的庫很可能是更有效的解決方案,但這需要更大的體系結構更改。

如果你只是想限制線程的數量,一個全局計數器應該沒問題,只要你以線程安全的方式訪問它。

2

使用不同的進程和管道傳輸數據。在Python中使用線程是非常蹩腳的。據我所知,即使你有一個多核處理器,它們也不會並行運行......但也許它已經在python3中修復了。

+0

實際上,在大多數語言中使用線程會導致鎖定到單個處理器或具有如此多的內存障礙以至於多個處理器在鎖定步驟中運行。這不是Python特有的。 – 2010-01-02 23:20:51

+1

+1:我會推薦多個線程的多個進程。 – 2010-01-02 23:21:49

4

我以前遇到過這種情況。只需製作一個任務池,併產生固定數量的線程,這些線程運行一個無限循環,從池中抓取任務,運行並重復。本質上,你正在實現自己的線程抽象,並使用操作系統線程來實現它。

這確實有缺點,主要的一點是如果您的任務阻塞很長一段時間,他們可以阻止執行其他任務。但它確實可以讓您創建無限數量的任務,僅受內存限制。

+0

這聽起來相當不錯。我不必擔心線程號。但是任務是檢查具有可變等待時間的代理。我應該做一些統計來找出線程的最佳常量。 – 19021programmer 2010-01-02 22:36:41

+0

可以肯定我不會阻止其他任務。 – 19021programmer 2010-01-02 22:37:14

+0

我猜你有2個選項。選項1是產生1006個OS線程。如果您確實想知道您需要的最小線程數,請繼續添加線程,直到達到100%的CPU利用率。這將意味着你已經超過了所有的I/O等待。 – 2010-01-02 22:56:00

1

小心減少默認線程堆棧大小。至少在Linux上,默認限制會對創建的線程數量造成嚴重限制。 Linux將進程虛擬地址空間塊分配給堆棧(通常爲10MB)。 300個線程×10MB堆棧分配=專用於堆棧的3GB虛擬地址空間,而在32位系統上則有3GB限制。你可能會少得多。

2

我的解決方案是:一個全局變量,當一個線程完成時線程產生和減少時會增加。產生線程的函數監視變量,以便達不到限制。

標準的方法是讓每個線程在一個循環中獲得下一個任務,而不是在處理一個之後死亡。這樣你就不必跟蹤線程的數量,因爲你只需要觸發一定數量的線程。作爲獎勵,你節省了線程創建/銷燬。

2

一個計數信號應該做的伎倆。

from socket import * 
from threading import * 

maxthreads = 1000 
threads_sem = Semaphore(maxthreads) 

class MyThread(Thread): 
    def __init__(self, conn, addr): 
     Thread.__init__(self) 
     self.conn = conn 
     self.addr = addr 
    def run(self): 
     try: 
      read = conn.recv(4096) 
      if read == 'go away\n': 
       global running 
       running = False 
      conn.close() 
     finally: 
      threads_sem.release() 

sock = socket() 
sock.bind(('0.0.0.0', 2323)) 
sock.listen(1) 
running = True 
while running: 
    conn, addr = sock.accept() 
    threads_sem.acquire() 
    MyThread(conn, addr).start() 
1

Twisted非常適合這個問題。有關編寫客戶端的教程,請參見http://twistedmatrix.com/documents/current/core/howto/clients.html

如果您不介意使用替代Python實現,則Stackless具有輕量級(非本地)線程。我認識的唯一一家公司是CCP,他們在客戶端和服務器上使用它作爲遊戲中的任務。您仍然需要使用Stackless進行異步I/O,因爲如果線程阻塞,進程將被阻止。

+0

非常感謝。我會嘗試扭曲。 – 19021programmer 2010-01-02 23:47:48

1

正如另一個線程中提到的那樣,爲什麼每次操作都會產生一個新線程?這是一個經典的生產者 - 消費者問題,不是嗎?取決於你如何看待它,代理檢查器可能是商業用戶或生產商。

無論如何,解決方案是對「任務」進行「排隊」處理,並使循環中的線程檢查是否有任何其他任務要在隊列中執行,如果沒有,請等待預定義的時間間隔,然後再次檢查。

您應該使用一些鎖定機制(即信號量)來保護您的隊列以防止競爭條件。

這其實並不困難。但它需要一點思考才能做到。祝你好運!