2010-07-06 41 views
0

我想用線程來處理流輸入。python線程和隊列無限數據輸入(流)

如何使下面的代碼爲無限輸入產生,例如通過使用itertools.count

下面的代碼會工作,如果: 「對於i在itertools.count():」被替換爲「的我的xrange(5):」

from threading import Thread 
from Queue import Queue, Empty 
import itertools 

def do_work(q): 
    while True: 
    try: 
     x = q.get(block=False) 
     print (x) 
    except Empty: 
     break 

if __name__ == "__main__": 
    work_queue = Queue() 
    for i in itertools.count(): 
    work_queue.put(i) 

    threads = [Thread(target=do_work, args=(work_queue,)) for i in range(8)] 

    for t in threads: t.start() 
    for t in threads: t.join() 
+0

如何讓下面的代碼做*什麼*? – 2010-07-10 14:42:10

回答

2

問題是,itertools.count生成無限序列。這意味着for循環將永遠不會結束。你應該把它放在它自己的函數中,並使它成爲一個單獨的線程。這樣,當工作線程從隊列中獲取數據時,您將擁有隊列增長。

+0

你是對的無限循環。我也是如此。但是,當我將itertools放入它自己的線程時,它給了我一個運行時錯誤。請張貼一些代碼。 – Joey 2010-07-06 10:33:09

+0

@Joey:什麼錯誤? – MattH 2010-07-06 10:38:30

+0

運行時錯誤Visual C++ 此應用程序請求運行時以非常規方式終止它。 – Joey 2010-07-06 10:40:22

1

也許我失去了一些東西,但不是那麼簡單,創建和for循環之前開始線程?

另外,當你沒有工作時,你的線程終止似乎是一個壞主意,因爲未來可能會有更多的工作顯示出來。當然,你希望他們阻止,直到有些工作可用?

+0

你對'do_work'中的'break'做了一個很好的說明。除非隊列緩衝了足夠數量的數據,否則工作線程可能會在更多信息放入隊列之前全部終止。 – unholysampler 2010-07-06 10:24:55

+0

不是真的爲我工作。請你可以發帖代碼 – Joey 2010-07-06 10:29:50

+0

@Joey這聽起來像一個家庭作業問題。如果是這樣,請標記爲這樣。 – jchl 2010-07-06 10:34:27

1

您需要用線程填充隊列。您需要管理隊列大小。特別是如果工人們花時間處理物品。您需要標記已完成的隊列項目。如果這與你關於twitter和「非常快速」輸入的其他問題有關,那麼就數據庫插入而言,你有更多的工作要做。

對於相當複雜的話題,您的問題太模糊了。即使你想要知道這並不容易,你似乎還不夠了解。我建議你對你想要做的事情有一些更具體的瞭解。

下面是一個填充和使用線程隊列的例子。隊列大小未被管理。

from threading import Thread 
from Queue import Queue, Empty, Full 
import itertools 
from time import sleep 


def do_work(q,wkr): 
    while True: 
    try: 
     x = q.get(block=True,timeout=10) 
     q.task_done() 
     print "Wkr %s: Consuming %s" % (wkr,x) 
     sleep(0.01) 
    except Empty: 
     print "Wkr %s exiting, timeout/empty" % (wkr) 
     break 
    sleep(0.01) 

def fill_queue(q,limit=1000): 
    count = itertools.count() 
    while True: 
    n = count.next() 
    try: 
     q.put(n,block=True,timeout=10) 
    except Full: 
     print "Filler exiting, timeout/full" 
     break 
    if n >= limit: 
     print "Filler exiting, reached limit - %s" % limit 
     break 
    sleep(0.01) 

work_queue = Queue() 

threads = [Thread(target=do_work, args=(work_queue,i)) for i in range(2)] 
threads.insert(0,Thread(target=fill_queue,args=(work_queue,100))) 

for t in threads: 
    t.start() 

for t in threads: 
    t.join() 

Wkr 0: Consuming 0 
Wkr 1: Consuming 1 
Wkr 0: Consuming 2 
Wkr 1: Consuming 3 
.... 
Wkr 1: Consuming 99 
Filler exiting, reached limit - 100 
Wkr 0: Consuming 100 
Wkr 1 exiting, timeout/empty 
Wkr 0 exiting, timeout/empty