2017-05-19 31 views
0

我試圖創建異步爬蟲和同步爬蟲,現在我面臨一個問題,爬行的結果是不同的,但他們應該是平等的(爬行的結果是相同的如果只有深度是1)。異步爬蟲不能正常工作[python]

from bs4 import BeautifulSoup 
import networkx as nx 
import urllib 
import urllib.request 
from urllib.parse import urlparse 
from urllib.parse import urljoin 
import time 
import asyncio 
import aiohttp 
from contextlib import closing 


class Crawler: 

def __init__(self, delay, depth): 
    self.delay = delay 
    self.graph = nx.DiGraph() 
    self.list_of_links = list() 
    self.crawled_urls = list() 
    self.depth = depth 

def validate_url(self, url): 
    """Check if url is valid""" 
    return 'http' in urlparse(url).scheme 

def run(self, async, start_list): 
    if async: 
     t1 = time.time() 
     self.async_crawl(start_list, self.depth) 
     t2 = time.time() 
     print('Async seconds passed: ', t2 - t1) 

    else: 
     t1 = time.time() 
     for elem in start_list: 
      self.crawl(elem, self.depth) 
     t2 = time.time() 
     print('Sync seconds passed: ', t2 - t1) 
    print('Links crawled: ', len(self.crawled_urls)) 
    print('Edges stored: ', len(self.list_of_links)) 
    print('Depth: ', self.depth) 

def crawl(self, url, depth): 
    if url in self.crawled_urls: 
     return [] 
    if depth and self.validate_url(url): 
     self.crawled_urls.append(url) 
     links = self.get_links(url) 
     for link in links: 
      self.list_of_links.append((url, link)) 
      self.crawl(link, depth - 1) 
    else: 
     return [] 

async def fetch_page(self, session, url): 
    """Get one page.""" 
    if url in self.crawled_urls: 
     return [] 
    else: 
     self.crawled_urls.append(url) 
    try: 
     with aiohttp.Timeout(10): 
      async with session.get(url) as response: 
       assert response.status == 200 
       new_urls = self.parse_for_links(url, await response.text()) 
       for new_url in new_urls: 
        self.list_of_links.append((url, new_url)) 
       return new_urls 
    except: 
     return [] 

def async_crawl(self, urls, depth): 
    """Get multiple pages.""" 
    if depth: 
     with closing(asyncio.get_event_loop()) as loop: 
      with aiohttp.ClientSession(loop=loop) as session: 
       tasks = [self.fetch_page(session, url) for url in urls if self.validate_url(url)] 
       new_urls = loop.run_until_complete(asyncio.gather(*tasks)) 
       if new_urls: 
        self.async_crawl(new_urls[0], depth - 1) 


def parse_for_links(self, url, text): 
    soup = BeautifulSoup(text, "html.parser") 
    return [urljoin(url, tag['href']) for tag in soup.findAll('a', href=True)] 

def get_links(self, url): 
    try: 
     req = urllib.request.urlopen(url) 
     req = map(lambda x: x.decode('utf-8'), req) 
     return self.parse_for_links(url, ''.join(list(req))) 
    except: 
     return [] 

def reset(self): 
    self.list_of_links = list() 
    self.crawled_urls = list() 
    self.graph = nx.DiGraph() 

def visualize(self): 
    self.graph.add_edges_from(self.list_of_links) 
    nx.write_gexf(self.graph, "graph.gexf") 

test2 = ['http://www.aclweb.org/anthology/'] 
cr = Crawler(10, 2) 
cr.run(True, test2) 
cr.reset() 
cr.run(False, test2) 

至於例子,我會告訴你我的測試案例之一:異步工作

Async seconds passed: 13.632593870162964 
Links crawled: 371 
Edges stored: 15374 
Depth: 2 
Sync seconds passed: 385.6858592033386 
Links crawled: 371 
Edges stored: 102755 
Depth: 2 
+0

你永遠等待'self.async_crawl()'在async_crawl'的'你的遞歸定義完成。此外,你的遞歸調用會產生數千個事件循環,這可能不是你想要做的。用一個隊列和一些互斥體來實現這個函數來限制併發。 – Blender

+0

@Blender你能告訴我在哪裏可以找到關於隊列的一些文章? –

回答

1

也許最好的方式使用生產者和消費者。

import asyncio 
    import aiohttp 

    from redd import redis.db.data #just module for take data 

    query = asyncio.Queue() 
    locker = [] 

    async def producer(num): 
     baseurl = redis.db.data 
     while True: 
      try: 
       url = next(baseurl) 
      except StopIteration: 
       print('Producer {} end'.format(num)) 
       break 
      else: 
       await query.put(url) 

    async def consumer(num): 
     flag = True 

     while flag: 
      url = await query.get() 
      async with aiohttp.ClientSession(loop=loop) as session: 
       async with session.get(url) as response: 
         result = await response.read() 
         print(result) 
      if query.empty() and locker[num] is not True: 
       locker[num] = True 
       print('Thread number {} is END: {}'.format(num, locker[num])) 
      if False not in locker: 
       for task in asyncio.Task.all_tasks(): 
        task.cancel() 
       loop.stop() 

    loop = asyncio.get_event_loop() 

    for i in range(2): 
     loop.create_task(producer(i)) 

    for i in range(5): 
     locker.append(False) 
     loop.create_task(consumer(i)) 

    loop.run_forever()