2013-05-05 52 views
2

我寫一個小的多線程HTTP文件下載,並希望能夠縮小可用線程的代碼遇到錯誤Python的 - 收縮的線程池動態/停止線程

的錯誤都會被具體到http服務器不允許任何連接的錯誤返回

例如,如果我設置了5個線程池,每個線程都試圖打開它自己的連接並下載一個文件塊。該服務器可能只允許2個連接,我相信會返回503錯誤,我想檢測到這一點並關閉一個線程,最終限制池的大小,大概只有服務器將允許的2個

我可以做一個線程本身?

是自我。 線程 _stop()是否足夠?

我還需要加入()嗎?

這裏是我的工人的類,它的下載,從隊列過程抓起,一旦下載它轉儲結果爲要保存resultQ由主線程文件

它在這兒,我想檢測一個HTTP 503和停止/終止/從可用池中取出一個線程 - 當然後面再添加失敗塊的隊列,以便剩餘的線程將處理它

class Downloader(threading.Thread): 
    def __init__(self, queue, resultQ, file_name): 
     threading.Thread.__init__(self) 
     self.workQ = queue 
     self.resultQ = resultQ 
     self.file_name = file_name 

    def run(self): 
     while True: 
      block_num, url, start, length = self.workQ.get() 
      print 'Starting Queue #: %s' % block_num 
      print start 
      print length 

      #Download the file 
      self.download_file(url, start, length) 

      #Tell queue that this task is done 
      print 'Queue #: %s finished' % block_num 
      self.workQ.task_done() 


    def download_file(self, url, start, length):   

     request = urllib2.Request(url, None, headers) 
     if length == 0: 
      return None 
     request.add_header('Range', 'bytes=%d-%d' % (start, start + length)) 

     while 1: 
      try: 
       data = urllib2.urlopen(request) 
      except urllib2.URLError, u: 
       print "Connection did not start with", u 
      else: 
       break 

     chunk = '' 
     block_size = 1024 
     remaining_blocks = length 

     while remaining_blocks > 0: 

      if remaining_blocks >= block_size: 
       fetch_size = block_size 
      else: 
       fetch_size = int(remaining_blocks) 

      try: 
       data_block = data.read(fetch_size) 
       if len(data_block) == 0: 
        print "Connection: [TESTING]: 0 sized block" + \ 
         " fetched." 
       if len(data_block) != fetch_size: 
        print "Connection: len(data_block) != length" + \ 
         ", but continuing anyway." 
        self.run() 
        return 

      except socket.timeout, s: 
       print "Connection timed out with", s 
       self.run() 
       return 

      remaining_blocks -= fetch_size 
      chunk += data_block 

     resultQ.put([start, chunk]) 

下面是我的init線程游泳池,進一步下來我把項目排隊

# create a thread pool and give them a queue 
for i in range(num_threads): 
    t = Downloader(workQ, resultQ, file_name) 
    t.setDaemon(True) 
    t.start() 

回答

1

我可以自己停止線程嗎?

請勿使用self._Thread__stop()。退出線程的run()方法就足夠了(您可以檢查標誌或從隊列中讀取標記值以知道何時退出)。

它在這裏,我想檢測一個HTTP 503並停止/殺死/從可用池中刪除一個線程 - 當然重新添加失敗的塊返回隊列,其餘的線程將處理它

您可以通過分離職責簡化代碼:

  • download_file()不應該嘗試在無限循環重新連接。如果有錯誤,讓我們調用download_file()的代碼在必要時重新提交
  • 關於併發連接數的控制可以封裝在一個Semaphore對象中。線程的數目可以從並行連接的數目在這種情況下
import concurrent.futures # on Python 2.x: pip install futures 
from threading import BoundedSemaphore 

def download_file(args): 
    nconcurrent.acquire(timeout=args['timeout']) # block if too many connections 
    # ... 
    nconcurrent.release() #NOTE: don't release it on exception, 
          #  allow the caller to handle it 

# you can put it into a dictionary: server -> semaphore instead of the global 
nconcurrent = BoundedSemaphore(5) # start with at most 5 concurrent connections 
with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor: 
    future_to_args = dict((executor.submit(download_file, args), args) 
          for args in generate_initial_download_tasks()) 

    while future_to_args: 
     for future in concurrent.futures.as_completed(dict(**future_to_args)): 
      args = future_to_args.pop(future) 
      try: 
       result = future.result() 
      except Exception as e: 
       print('%r generated an exception: %s' % (args, e)) 
       if getattr(e, 'code') != 503: 
        # don't decrease number of concurrent connections 
        nconcurrent.release() 
       # resubmit 
       args['timeout'] *= 2      
       future_to_args[executor.submit(download_file, args)] = args 
      else: # successfully downloaded `args` 
       print('f%r returned %r' % (args, result)) 

參見ThreadPoolExecutor() example不同。

+0

謝謝,我需要仔細閱讀。 我終於和你說的一樣得出了同樣的結論,只要退出線程run(),它就會停止嘗試從隊列中拔出。 我喜歡你所提出的建議,謝謝! – MikeM 2013-05-05 21:37:52

0

線程對象只是簡單地通過從run方法返回來終止線程 - 它不會調用stop。如果將線程設置爲守護進程模式,則不需要連接,否則主線程需要這樣做。線程通常使用resultq來報告它正在退出,並且主線程使用該信息來執行連接。這有助於順序終止您的流程。你可以在系統退出時得到奇怪的錯誤,如果python仍然在玩弄多個線程,並且它最好的方法就是這樣。

+0

但是正如你所看到的,只要有工件要從workQ中抓取,線程就會一直運行,如果一個線程遇到503我想通過1減少可用線程的數量..剩下的線程去處理什麼是留在工作中Q – MikeM 2013-05-05 16:16:16

1

你應該使用一個線程池來控制你的線程的生命:

然後當一個線程存在,您可以將消息發送給主線程(正在處理線程池),然後更改線程池的大小,並推遲將要清空的堆棧中的新請求或失敗請求。

tedelanay是絕對正確的,你給你的線程守護進程的狀態。沒有必要將它們設置爲守護進程。

基本上,你可以簡化你的代碼,你可以做一些事情如下:

import threadpool 

def process_tasks(): 
    pool = threadpool.ThreadPool(4) 

    requests = threadpool.makeRequests(download_file, arguments) 

    for req in requests: 
     pool.putRequest(req) 

    #wait for them to finish (or you could go and do something else) 
    pool.wait() 

if __name__ == '__main__': 
    process_tasks() 

其中arguments達到你的戰略。要麼給你的線程一個隊列作爲參數,然後清空隊列。或者,您可以在process_tasks中處理隊列,在池已滿時阻塞,並在線程完成時打開新線程,但隊列不爲空。這一切都取決於您的需求和您的下載程序的上下文。

資源:

+0

非常好的信息,謝謝!我沒有看到你如何使用線程池重新調整池的大小..我必須忽略一些明顯的東西? – MikeM 2013-05-05 16:46:31