3

我在Web應用程序按需分叉進程的環境中工作,每個進程都有自己的線程池來爲Web請求提供服務。線程可能需要向外部服務發出HTTPS請求,而請求庫目前用於這樣做。當第一次添加請求時,通過爲每個請求創建新的requests.Sessionrequests.adapters.HTTPAdapter,或者甚至通過簡單地根據需要調用requests.getrequests.post來天真地使用它。出現的問題是每次都建立一個新連接,而不是可能利用HTTP持久連接。可能的修復方法是使用連接池,但在使用請求庫時,在線程之間共享HTTP連接池的建議方式是什麼?有一個嗎?在請求的線程之間共享HTTP連接池的建議方式是什麼?

第一個想法是共享一個requests.Session,但目前不安全,如「Is the Session object from Python's Requests library thread safe?」和「Document threading contract for Session class」中所述。在每個線程中按需創建的requests.Sessions之間共享單個全局requests.adapters.HTTPAdapter是否安全且足夠?根據「Our use of urllib3's ConnectionPools is not threadsafe.」,即使這可能不是一個有效的用途。只需要連接到少量不同的遠程端點就可以使其成爲可行的方法。

+0

是否真的要請求?我使用urllib2來做。我構建了一個負責獲取/張貼/向服務器添加有用方法的任務的對象。然後我創建一個池,並從外部控制數據流,也就是說,從主線程中控制數據流。這很容易。如果你不介意urllib2而不是請求,我會發布一些代碼。 – Dalen

+0

@Dalen我想嘗試查找是否有基於請求的解決方案,以便最大限度地減少所需的代碼更改量。 – rrrzx

回答

0

我懷疑現有方法可以在請求中執行此操作。但是你可以修改我的代碼來封裝請求session()而不是標準的urllib2。

這是我的代碼時,我想在同一時間得到來自多個站點的數據,我用:



# Following code I keep in file named do.py 
# It can be use to perform any number of otherwise blocking IO operations simultaneously 
# Results are available to you when all IO operations are completed. 
# Completed means either IO finished successfully or an exception got raised. 
# So when all tasks are completed, you pick up results. 
# Usage example: 
# >>> import do 
# >>> results = do.simultaneously([ 
# ...  (func1, func1_args, func1_kwargs), 
# ...  (func2, func2_args, func2_kwargs), ...]) 
# >>> for x in results: 
# ...  print x 
# ... 
from thread import start_new_thread as thread 
from thread import allocate_lock 
from collections import deque 
from time import sleep 

class Task: 
    """A task's thread holder. Keeps results or exceptions raised. 
    This could be a bit more robustly implemented using 
    threading module. 
    """ 
    def __init__ (self, func, args, kwargs, pool): 
     self.func = func 
     self.args = args 
     self.kwargs = kwargs 
     self.result = None 
     self.done = 0 
     self.started = 0 
     self.xraised = 0 
     self.tasks = pool 
     pool.append(self) 
     self.allow = allocate_lock() 
     self.run() 

    def run (self): 
     thread(self._run,()) 

    def _run (self): 
     self.allow.acquire() # Prevent same task from being started multiple times 
     self.started = 1 
     self.result = None 
     self.done = 0 
     self.xraised = 0 
     try: 
      self.result = self.func(*self.args, **self.kwargs) 
     except Exception, e: 
      e.task = self # Keep reference to the task in an exception 
          # This way we can access original task from caught exception 
      self.result = e 
      self.xraised = 1 
     self.done = 1 
     self.allow.release() 

    def wait (self): 
     while not self.done: 
      try: sleep(0.001) 
      except: break 

    def withdraw (self): 
     if not self.started: self.run() 
     if not self.done: self.wait() 
     self.tasks.remove(self) 
     return self.result 

    def remove (self): 
     self.tasks.remove(self) 

def simultaneously (tasks, xraise=0): 
    """Starts all functions within iterable <tasks>. 
    Then waits for all to be finished. 
    Iterable <tasks> may contain a subiterables with: 
     (function, [[args,] kwargs]) 
    or just functions. These would be called without arguments. 
    Returns an iterator that yields result of each called function. 
    If an exception is raised within a task the Exception()'s instance will be returned unless 
    is 1 or True. Then first encountered exception within results will be raised. 
    Results will start to yield after all funcs() either return or raise an exception. 
    """ 
    pool = deque() 
    for x in tasks: 
     func = lambda: None 
     args =() 
     kwargs = {} 
     if not isinstance(x, (tuple, list)): 
      Task(x, args, kwargs, pool) 
      continue 
     l = len(x) 
     if l: func = x[0] 
     if l>1: 
      args = x[1] 
      if not isinstance(args, (tuple, list)): args = (args,) 
     if l>2: 
      if isinstance(x[2], dict): 
       kwargs = x[2] 
     Task(func, args, kwargs, pool) 
    for t in pool: t.wait() 
    while pool: 
     t = pool.popleft() 
     if xraise and t.xraised: 
      raise t.result 
     yield t.result 

# So, I do this using urllib2, you can do it using requests if you want. 
from urllib2 import URLError, HTTPError, urlopen 
import do 

class AccessError(Exception): 
    """Raised if server rejects us because we bombarded same server with multiple connections in too small time slots.""" 
    pass 

def retrieve (url): 
    try: 
     u = urlopen(url) 
     r = u.read() 
     u.close() 
     return r 
    except HTTPError, e: 
     msg = "HTTPError %i - %s" % (e.code, e.msg) 
     t = AccessError() 
     if e.code in (401, 403, 429): 
      msg += " (perhaps you're making too many calls)" 
      t.reason = "perhaps you are making too many calls" 
     elif e.code in (502, 504): 
      msg += " (service temporarily not available)" 
      t.reason = "service temporarily not available" 
     else: t.reason = e.msg 
     t.args = (msg,) 
     t.message = msg 
     t.msg = e.msg; t.code = e.code 
     t.orig = e 
     raise t 
    except URLError, e: 
     msg = "URLError %s - %s (%s)" % (str(e.errno), str(e.message), str(e.reason)) 
     t = AccessError(msg) 
     t.reason = str(e.reason) 
     t.msg = str(t.message) 
     t.code = e.errno 
     t.orig = e 
     raise t 
    except: raise 

urls = ["http://www.google.com", "http://www.amazon.com", "http://stackoverflow.com", "http://blah.blah.sniff-sniff"] 
retrieval = [] 
for u in urls: 
    retrieval.append((retrieve, u)) 

x = 0 
for data in do.simultaneously(retrieval): 
    url = urls[x] 
    if isinstance(data, Exception): 
     print url, "not retrieved successfully!\nThe error is:" 
     print data 
    else: 
     print url, "returned", len(data), "characters!!\nFirst 100:" 
     print data[:100] 
    x += 1 

# If you need persistent HTTP, you tweak the retrieve() function to be able to hold the connection open. 
# After retrieving currently requested data You save opened connections in a global dict() with domains as keys. 
# When the next retrieve is called and the domain already has an opened connection, you remove the connection from dict (to prevent any other retrieve grabbing it in the middle of nowhere), then you use it 
# to send a new request if possible. (If it isn't timed out or something), if connection broke, you just open a new one. 
# You will probably have to introduce some limits if you will be using multiple connections to same server at once. 
# Like no more than 4 connections to same server at once, some delays between requests and so on. 
# No matter which approach to multithreading you will choose (something like I propose or some other mechanism) thread safety is in trouble because HTTP is serialized protocol. 
# You send a request, you await the answer. After you receive whole answer, then you can make a new request if HTTP/1.1 is used and connection is being kept alive. 
# If your thread tries to send a new request during the data download a general mess will occur. 
# So you design your system to open as much connections as possible, but always wait for one to be free before reusing it. Thats a trick here. 
# As for any other part of requests being thread unsafe for some reason, well, you should check the code to see which calls exactly should be kept atomic and then use a lock. But don't put it in a block where major IO is occurring or it will be as you aren't using threads at all. 

相關問題