2014-06-16 41 views
4

我有一個Python 2.7程序,它從網站提取數據並將結果轉儲到數據庫。它遵循消費者生產者模型,並使用線程模塊編寫。使用Python的asyncio按順序獲取數據

只是爲了好玩我想重寫這個程序使用新的asyncio模塊(從3.4),但我不知道如何正確地做到這一點。

最重要的要求是程序必須按順序從同一網站獲取數據。例如,對於一個網址http://a-restaurant.com應該首先得到http://a-restaurant.com/menu/0,然後http://a-restaurant.com/menu/1」,然後http://a-restaurant.com/menu/2 ... 如果他們不是爲了獲取網站停止提供網頁乾脆你必須從0

然而另一個取另一個網站(http://another-restaurant.com)可以(也應該)運行在同一時間(其他網站也有sequantial限制)開始。

線程模塊適合這一點,因爲我可以爲每個網站創建單獨的線程,並且在每個線程中它可以等到一個頁面完成加載之後才能獲取另一個線程。

這裏是從線程版本(Python 2.7版)一個非常簡單的代碼片段:

class FetchThread(threading.Threading) 
    def __init__(self, queue, url) 
     self.queue = queue 
     self.baseurl = url 
    ... 
    def run(self) 
     # Get 10 menu pages in a sequantial order 
     for food in range(10): 
      url = self.baseurl + '/' + str(food) 
      text = urllib2.urlopen(url).read() 
      self.queue.put(text) 
      ... 
def main() 
    queue = Queue.Queue() 
    urls = ('http://a-restaurant.com/menu', 'http://another-restaurant.com/menu') 
    for url in urls: 
     fetcher = FetchThread(queue, url) 
     fetcher.start() 
     ... 

這裏就是我試圖用ASYNCIO做到這一點(3.4.1):

@asyncio.coroutine 
def fetch(url): 
    response = yield from aiohttp.request('GET', url) 
    response = yield from response.read_and_close() 
    return response.decode('utf-8') 

@asyncio.coroutine 
def print_page(url): 
    page = yield from fetch(url) 
    print(page) 


l = [] 
urls = ('http://a-restaurant.com/menu', 'http://another-restaurant.com/menu') 
for url in urls: 
    for food in range(10): 
     menu_url = url + '/' + str(food) 
     l.append(print_page(menu_url)) 

loop.run_until_complete(asyncio.wait(l)) 

它以非順序的順序提取和打印所有內容。那麼,我想這就是那些協同程序的全部想法。我應該不使用aiohttp並只使用urllib獲取?但是,第一家餐廳的提貨會阻止其他餐館的提貨嗎?我只是覺得這完全錯了嗎? (這只是一個嘗試按順序獲取東西的測試,還沒有到隊列部分)

回答

5

您當前的代碼對於不關心順序排序請求的餐廳。菜單的所有十個請求都將同時運行,並在完成後立即打印到stdout。

顯然,這不適用於需要連續請求的餐廳。您需要重構一點對於工作:

@asyncio.coroutine 
def fetch(url): 
    response = yield from aiohttp.request('GET', url) 
    response = yield from response.read_and_close() 
    return response.decode('utf-8') 

@asyncio.coroutine 
def print_page(url): 
    page = yield from fetch(url) 
    print(page) 

@syncio.coroutine 
def print_pages_sequential(url, num_pages): 
    for food in range(num_pages): 
     menu_url = url + '/' + str(food) 
     yield from print_page(menu_url) 

l = [print_pages_sequential('http://a-restaurant.com/menu', 10)] 

conc_url = 'http://another-restaurant.com/menu' 
for food in range(10): 
    menu_url = conc_url + '/' + str(food) 
    l.append(print_page(menu_url)) 

loop.run_until_complete(asyncio.wait(l)) 

而不是增加的順序餐廳列表所有十個請求,我們一個協程增加,這將依次遍歷所有十頁的列表。這種方式的效果是,yield from print_page將停止print_pages_sequential的執行,直到print_page請求完成,但它會這樣做,而不會阻止任何其他併發運行的協同程序(例如所有print_page調用附加到l)。

通過這樣做,您所有的「其他餐館」請求都可以完全併發運行,就像您想要的那樣,您的「a-restaurant」請求將按順序運行,但不會阻止任何「餐廳「的要求。

編輯:

如果所有的網站都有相同的順序取要求,邏輯可以簡化更多:

l = [] 
urls = ["http://a-restaurant.com/menu", "http://another-restaurant.com/menu"] 
for url in urls: 
    menu_url = url + '/' + str(food) 
    l.append(print_page_sequential(menu_url, 10)) 

loop.run_until_complete(asyncio.wait(l)) 
+0

謝謝@dano。需要說明的是:所有餐廳都需要在菜單中順序提取數據,但我想同時從第一家餐廳和第二家餐廳提取數據(只是他們各自的菜單提取需要按順序進行)。所以我想解決方案是'l = [print_pages_sequential('http://a-restaurant.com/menu',10),print_pages_sequential('http://another-restaurant.com/menu',10)]'然後運行'loop.run_until_complete(asyncio.wait(l))'(現在不能測試這個) – mat

+1

@ user3313978啊,對不起,我誤解了這個要求。考慮到約束條件,您對解決方案的假設是正確的。我更新了我的答案以反映新的約束。 – dano

+0

這仍然不會按順序啓動請求,@dano。不幸的是,當「收集」和「等待」將任何傳入的協同程序包裝到「任務」中時,它們將以非確定性順序進行安排。請參閱[asyncio issue#432](https://github.com/python/asyncio/issues/432)。解決方法是在將每個協程對象傳遞給'gather'或'wait'之前,手動爲每個協程對象分配一個循環任務。例如'l.append(loop.create_task(print_page_sequential(menu_url,10)))' –

2

asyncio.Task替代threading.Thread ASYNCIO世界。 asyncio.async也創建新的任務。

asyncio.gather是等待幾個協程的非常方便的方法,我更喜歡它而不是asyncio.wait

@asyncio.coroutine 
def fetch(url): 
    response = yield from aiohttp.request('GET', url) 
    response = yield from response.read_and_close() 
    return response.decode('utf-8') 

@asyncio.coroutine 
def print_page(url): 
    page = yield from fetch(url) 
    print(page) 

@asyncio.coroutine 
def process_restaurant(url): 
    for food in range(10): 
     menu_url = url + '/' + str(food) 
     yield from print_page(menu_url) 

urls = ('http://a-restaurant.com/menu', 'http://another-restaurant.com/menu') 
coros = [] 
for url in urls: 
    coros.append(asyncio.Task(process_restaurant(url))) 

loop.run_until_complete(asyncio.gather(*coros)) 
+0

很高興知道。 'asyncio'似乎比我預想的要複雜一些。順便說一句,'def process_restaurant(url)'缺少一個縮進級別。 – mat

+0

「process_restaurant」的標記已修復。感謝報告。 –

相關問題