2014-03-26 53 views
1

我是一個gevent新手,但我覺得我的工作 - 在有限的意義上。基本上,對於1的池,代碼繼續進行,而對於較大的池,代碼會卡住,通常在第一個池中(例如,有5個池,我看到3個greenlet整理,但不會更多)。出了什麼問題?產卵?加入?gevent池卡住

我無法驗證遠程服務器是否得到由多個查詢困惑,但它與串行請求的快速序列沒有問題的,所以可能不是...

(我共享代碼的全部,因爲我不確定錯誤在哪裏,謝謝你的支持。)

from urllib2 import urlopen 
from lxml.etree import parse 
import os, csv, cStringIO, codecs, pickle 
from selenium import webdriver 
from time import sleep 
import gevent 
from gevent import socket 
from gevent import monkey, pool 
# patches stdlib (including socket and ssl modules) to cooperate with other greenlets 
monkey.patch_all() 


class UnicodeWriter: 
    """ 
    A CSV writer which will write rows to CSV file "f", 
    which is encoded in the given encoding. 
    """ 

    def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds): 
     # Redirect output to a queue 
     self.queue = cStringIO.StringIO() 
     self.writer = csv.writer(self.queue, dialect=dialect, **kwds) 
     self.stream = f 
     self.encoder = codecs.getincrementalencoder(encoding)() 

    def writerow(self, row): 
     self.writer.writerow([unicode(s).encode("utf-8") for s in row]) 
     # Fetch UTF-8 output from the queue ... 
     data = self.queue.getvalue() 
     data = data.decode("utf-8") 
     # ... and reencode it into the target encoding 
     data = self.encoder.encode(data) 
     # write to the target stream 
     self.stream.write(data) 
     # empty queue 
     self.queue.truncate(0) 

    def writerows(self, rows): 
     for row in rows: 
      self.writerow(row) 

os.chdir('/Users/laszlosandor/Downloads/kozbeszerzes') 

HOSTNAME = 'http://kozbeszerzes.ceu.hu' 

driver = webdriver.Chrome() 
results = set() 

for y in xrange(1998,2015): 
    for p in xrange(0,9999): 
     driver.get('http://kozbeszerzes.ceu.hu/searchresults.xhtml?q={}&page={}'.format(y,p)) 
     sleep(1) 
     if len(driver.find_elements_by_class_name('result'))==0: 
      break 
     for e in driver.find_elements_by_class_name('result'): 
      link = e.find_element_by_tag_name('a') 
      r = link.get_attribute('href').encode('ascii', 'ignore') 
      if r[:34]== 'http://kozbeszerzes.ceu.hu/tender/': 
       results.add(r) 
driver.quit() 

with open('list_of_urls', 'wb') as f: 
    pickle.dump(results, f) 
#with open('list_of_urls', 'r') as f: 
#  results = pickle.load(f) 

entities = set() 

header = ('TenderID','RequestorName','URL','Year','RequestorID','Subject','SourceURL','EstValue','Currency','DecisionDate','Value','VAT') 

# """Spawn multiple workers and wait for them to complete""" 
# # limit ourselves to max 10 simultaneous outstanding requests 
p = pool.Pool(10) 

f = open('tenders.csv', 'w') 
f.write(codecs.BOM_UTF8) 
writer = UnicodeWriter(f) 
writer.writerow(header) 

def workres(res): 
    try: 
     tender = parse(urlopen(res)).getroot() 
     print ('%s succeeded' % res) 
     for requestor in tender.findall('requestor'): 
      entities.add(HOSTNAME + requestor.get('url')) 
     id = tender.get('id') 
     reqname = tender.get('requestor') 
     url = tender.get('url') 
     year = tender.get('year') 
     reqid = tender.get('requestor_id') 
     subject = tender.get('subject') 
     source = tender.get('source_url') 
     estval = tender.get('estimated_value') 
     for part in tender.findall('./parts/part'): 
      winner = part.find('winner') 
      entities.add(HOSTNAME + winner.get('url')) 
      curr = part.find('currency').text 
      date = part.find('decisionDate').text 
      value = part.find('value').text 
      vat = part.find('vat').text 
      row = id, reqname, url, year, reqid, subject, source, estval, curr, date, value, vat 
      writer.writerow(row) 
    except socket.gaierror: 
     ex = sys.exc_info()[1] 
     print ('%s failed with %s' % (res, ex)) 

jobs = [p.spawn(workres, res) for res in results] 
p.join() 

f.close() 

with open('entities', 'wb') as f: 
    pickle.dump(entities, f) 

header = ['ID','URL','Name','NominalCity','City', 'ZIP', 'Address'] 

f = open('entities.csv', 'w') 
f.write(codecs.BOM_UTF8) 
writer = UnicodeWriter(f) 
writer.writerow(header) 

def workent(ent): 
    try: 
     ent = parse(urlopen(ent)).getroot() 
     print ('%s succeeded' % ent) 
     id = ent.get('id') 
     url = ent.get('url') 
     name = ent.get('name') 
     nominalcity = ent.get('city') 
     cities = ent.findall('./resolved_addresses/whitelistAddress/city') 
     zips = ent.findall('./resolved_addresses/whitelistAddress/postalCode') 
     streets = ent.findall('./resolved_addresses/whitelistAddress/street') 
     for a in xrange(0,len(cities)): 
      city = cities[a].text 
      zip = zips[a].text 
      street = streets[a].text 
      row = id, url, name, nominalcity, city, zip, street 
      writer.writerow(row) 
    except socket.gaierror: 
     ex = sys.exc_info()[1] 
     print ('%s failed with %s' % (ent, ex)) 

jobs = [p.spawn(workent, ent) for ent in entities] 
p.join() 

f.close() 

回答

0

我在這裏看到很多錯誤。

  • 有不使用gevent.sleep()和不time.sleep是 阻塞。
  • 您的變量名稱太短。您可以添加 描述代碼的每個部分應該做什麼。例如變量'p' 被使用兩次..
  • 有多個urls獲取使用urlopen和驅動模塊?混亂..
  • 我會用不同的工人之間的隊列,只有一個工人做 write_row電話,並與文件訪問處理,現在你有多個綠色允許訪問 同一個文件..
  • 使用更少的名單compehensions剛寫出循環。

  • 我建議把'除了'workres'之外的嘗試只在'解析(urlopen())' 代碼可能有更多異常發生,你現在看不到。

more tips for gevent

+1

謝謝 - 我的代碼可以肯定的是一個快速的黑客,但我還是不明白,爲什麼GEVENT池沒有成功。寫入文件的多個greenlet在網絡上的許多例子中似乎都不是問題,我只是跟着他們。而睡眠是在代碼的前半部分,以獲得(刮)最終可以獲得的綠色地址列表。所以阻塞時間不應該是'workent'功能的問題,不是?假設在上半場之後我已經在鹹菜的實體列表中。爲什麼在一個gevent池中不能「運行」它們?謝謝! –