2011-08-02 27 views

回答

-2

使用select模塊通過輪詢或threading模塊對此進行線程處理。

+0

如果您已使用扭曲,這兩個不好的建議。 – Glyph

+2

這個人似乎已經在使用扭曲。謹慎地闡述爲什麼這種事情是不好的選擇? – stderr

8
from twisted.internet.defer import inlineCallbacks, DeferredQueue 

@inlineCallbacks 
def worker(queue): 
    while 1: 
     url = yield queue.get() # wait for a url from the queue 

     if url is None: # insert None into the queue to kill workers 
      queue.put(None) 
      return # done 

     data = yield download(url) # download the file 
     process(data) # do stuff with it 


queue = DeferredQueue() # your queue 

# make workers 
MAX = 20 
workers = [worker(queue) for _ in range(MAX)] 
+0

這可以工作,但它不下載並行文件。可能值得添加一個允許多個併發下載的示例。 – Glyph

+2

@Glyph:它不會盡可能快地下載(工作人員等待處理),但由於20個「工作人員」,應該進行一些並行處理。 –

+0

糟糕。完全錯過了那裏底部的循環。請忽略我的評論:)。 – Glyph

2

以下是https://github.com/caolan/async到Python的翻譯。

from twisted.internet import defer 
class Queue: 
    workers = 0 
    tasks = [] 
    def __init__(self, worker, concurrency): 
     self.worker = worker 
     self.concurrency = concurrency 
     self.saturated = None 
     self.empty = None 
     self.drain = None 
    def push(self, data): 
     deferred = defer.Deferred() 
     self.tasks.append({'data': data, 'callback': deferred}) 
     if self.saturated and len(tasks) == concurrency: 
      self.saturated() 
     self.process() 
     return deferred 
    def task_finished(self, *args): 
     self.workers = self.workers - 1 
     if self.drain and len(self.tasks) + self.workers == 0: 
      self.drain() 
     self.process() 
    def process(self): 
     if self.workers >= self.concurrency or len(self.tasks) == 0: 
      return 
     task = self.tasks.pop(0) 
     if self.empty and len(self.tasks) == 0: 
      self.empty() 
     self.workers = self.workers + 1 
     d = self.worker(task['data']) 
     d.addCallback(self.task_finished) 
     d.addCallback(task['callback'].callback) 

from twisted.web import client 
from twisted.internet import reactor 
def dl_worker(data): 
    url = data[0] 
    fname = data[1] 
    print "Download file:", fname 
    d = client.downloadPage(url, fname) 
    return d # very important! 

q = Queue(dl_worker, 2) 
q.drain = reactor.stop 
for i in range(0,3): 
    q.push(["http://download.thinkbroadband.com/5MB.zip", "file"+str(i)]) 
reactor.run() 

我希望通過雕文的QC:D乾杯!