2012-02-08 42 views
1

我編程應該可以解決多個主機名到使用多線程的IP地址的腳本。的Python socket.gethostbyname_ex()多線程失敗

但是,它未能在一些隨機點凍結。這怎麼解決?

num_threads = 100 
conn = pymysql.connect(host='xx.xx.xx.xx', unix_socket='/tmp/mysql.sock', user='user', passwd='pw', db='database') 
cur = conn.cursor() 
def mexec(befehl): 
    cur = conn.cursor() 
    cur.execute(befehl) 

websites = ['facebook.com','facebook.org' ... ... ... ...] \#10.000 websites in array 
queue = Queue() 
def getips(i, q): 
    while True: 
     #--resolve IP-- 
     try: 
      result = socket.gethostbyname_ex(site) 
      print(result) 
      mexec("UPDATE sites2block SET ip='"+result+"', updated='yes' ") #puts site in mysqldb 
     except (socket.gaierror): 
      print("no ip") 
      mexec("UPDATE sites2block SET ip='no ip', updated='yes',") 
     q.task_done() 
#Spawn thread pool 
for i in range(num_threads): 
    worker = Thread(target=getips, args=(i, queue)) 
    worker.setDaemon(True) 
    worker.start() 
#Place work in queue 
for site in websites: 
    queue.put(site) 
#Wait until worker threads are done to exit 
queue.join() 
+0

你會得到什麼錯誤? – 2012-02-08 13:59:17

+0

sry,忘了接受!我沒有得到具體的錯誤,腳本運行,並在某些時候只是凍結而不顯示任何具體的錯誤。然後我必須殺死殼。 – user670186 2012-02-08 14:10:31

+0

這個例子是不完整的 - 什麼是mexec? – 2012-02-08 14:15:47

回答

3

你可以使用一個警戒值信號線,沒有工作並加入線程,而不是queue.task_done()queue.join()

#!/usr/bin/env python 
import socket 
from Queue import Queue 
from threading import Thread 

def getips(queue): 
    for site in iter(queue.get, None): 
     try: # resolve hostname 
      result = socket.gethostbyname_ex(site) 
     except IOError, e: 
      print("error %s reason: %s" % (site, e)) 
     else: 
      print("done %s %s" % (site, result)) 

def main(): 
    websites = "youtube google non-existent.example facebook yahoo live".split() 
    websites = [name+'.com' for name in websites] 

    # Spawn thread pool 
    queue = Queue() 
    threads = [Thread(target=getips, args=(queue,)) for _ in range(20)] 
    for t in threads: 
     t.daemon = True 
     t.start() 

    # Place work in queue 
    for site in websites: queue.put(site) 
    # Put sentinel to signal the end 
    for _ in threads: queue.put(None) 
    # Wait for completion 
    for t in threads: t.join() 

main() 

gethostbyname_ex()功能已經過時了。要支持IPv4/v6地址,您可以使用socket.getaddrinfo()

+0

它從隊列中導入隊列。這個代碼也很重要!謝謝! – user670186 2012-02-10 23:53:50

+1

Python 2.x使用'從隊列導入隊列'。 Python 3.x - 「從隊列導入隊列」符合[pep-8](http://www.python.org/dev/peps/pep-0008/)模塊命名約定。爲了避免混淆,當你問'python-3.x'的問題時,你可以使用標籤'python-3.x'。 – jfs 2012-02-11 00:53:30

1

我的第一個想法是,你因超載的DNS錯誤 - 也許你的解析器只是不允許你這樣做超過一定每次查詢的多。


此外,我發現了一些問題:

  1. 你忘了在while循環正確分配site - 這可能會更好地由for循環遍歷隊列,或東西來代替。在您的版本,您使用site變量從模塊級的命名空間,這可能會導致雙的和其他人跳過查詢。

    在這個地方,你可以控制,如果隊列中仍有條目或等待一些。如果兩者都不是,你可以退出你的線程。

  2. 爲了安全起見,你最好做

    def mexec(befehl, args=None): 
        cur = conn.cursor() 
        cur.execute(befehl, args) 
    

    爲了做到事後

    mexec("UPDATE sites2block SET ip=%s, updated='yes'", result) #puts site in mysqldb 
    

爲了留在未來的協議兼容,你應該使用socket.getaddrinfo()而不是socket.gethostbyname_ex(site)。你可以得到你想要的所有IP(起初,你可以限制到IPv4,但切換到IPv6更容易),也可以把它們全部放入數據庫。


爲了您的隊列,代碼樣本可能是

def queue_iterator(q): 
    """Iterate over the contents of a queue. Waits for new elements as long as the queue is still filling.""" 
    while True: 
     try: 
      item = q.get(block=q.is_filling, timeout=.1) 
      yield item 
      q.task_done() # indicate that task is done. 
     except Empty: 
      # If q is still filling, continue. 
      # If q is empty and not filling any longer, return. 
      if not q.is_filling: return 

def getips(i, q): 
    for site in queue_iterator(q): 
     #--resolve IP-- 
     try: 
      result = socket.gethostbyname_ex(site) 
      print(result) 
      mexec("UPDATE sites2block SET ip=%s, updated='yes'", result) #puts site in mysqldb 
     except (socket.gaierror): 
      print("no ip") 
      mexec("UPDATE sites2block SET ip='no ip', updated='yes',") 
# Indicate it is filling. 
q.is_filling = True 
#Spawn thread pool 
for i in range(num_threads): 
    worker = Thread(target=getips, args=(i, queue)) 
    worker.setDaemon(True) 
    worker.start() 
#Place work in queue 
for site in websites: 
    queue.put(site) 
queue.is_filling = False # we are done filling, if q becomes empty, we are done. 
#Wait until worker threads are done to exit 
queue.join() 

應該做的伎倆。


另一個問題是你的並行插入MySQL。您一次只能執行一次MySQL查詢。所以,你既可以保護通過threading.Lock()RLock()訪問,或者你可以把答案成被另一個線程,它甚至可以捆綁他們處理的另一隊列。

+0

嗨,謝謝!對於1.你可以發佈一個更正的代碼,我只是沒有得到它的工作... – user670186 2012-02-08 14:47:24

+0

@ user670186完成。剛剛糾正了這個;其他東西沒有整合。 – glglgl 2012-02-08 15:29:19

+0

它可能是簡單的使用阻塞'ITER(q.get,無)'和前哨:在範圍(NUM_THREADS)'因爲我:q.put(無)'和加入會話,而不是不可靠的'q.task_done( ),q.join(),q.is_filling' – jfs 2012-02-08 19:47:48

0

您可能會發現更容易使用concurrent.futuresthreadingmultiprocessingQueue直接:

#!/usr/bin/env python3 
import socket 
# pip install futures on Python 2.x 
from concurrent.futures import ThreadPoolExecutor as Executor 

hosts = "youtube.com google.com facebook.com yahoo.com live.com".split()*100 
with Executor(max_workers=20) as pool: 
    for results in pool.map(socket.gethostbyname_ex, hosts, timeout=60): 
     print(results) 

注意:您可以輕鬆地使用線程進程切換:

from concurrent.futures import ProcessPoolExecutor as Executor 

你需要它,如果gethostbyname_ex()是不是線程安全的在你的OS例如,it might be the case on OSX

如果您想以處理gethostbyname_ex()可能出現的異常:

import concurrent.futures 

with Executor(max_workers=20) as pool: 
    future2host = dict((pool.submit(socket.gethostbyname_ex, h), h) 
         for h in hosts) 
    for f in concurrent.futures.as_completed(future2host, timeout=60): 
     e = f.exception() 
     print(f.result() if e is None else "{0}: {1}".format(future2host[f], e)) 

它類似於the example from the docs