2017-02-22 65 views
0

讓這段代碼讀取一個文件並對其進行處理。該文件相當大,有1200萬行,因此目前我將它手動分割爲1000行文件,並按照每1000行(bash腳本)順序啓動每個進程。Twisted/Python - 逐行處理大文件

有沒有辦法使用Twisted來加載一個文件,並從一個文件處理1000個項目(進度條會很好),而不需要我手動分割它?

scanner.py

import argparse 

from tqdm import tqdm 
from sys import argv 
from pprint import pformat 

from twisted.internet.task import react 
from twisted.web.client import Agent, readBody 
from twisted.web.http_headers import Headers 

import lxml.html 

from geoip import geolite2 
import pycountry 

from tld import get_tld 
import json 
import socket 

poweredby = "" 
server = "" 
ip = "" 


def cbRequest(response, url): 
    global poweredby, server, ip 
    # print 'Response version:', response.version 
    # print 'Response code:', response.code 
    # print 'Response phrase:', response.phrase 
    # print 'Response headers:' 
    # print pformat(list(response.headers.getAllRawHeaders())) 
    poweredby = response.headers.getRawHeaders("X-Powered-By")[0] 
    server = response.headers.getRawHeaders("Server")[0] 

    #print poweredby 
    #print server 

    d = readBody(response) 
    d.addCallback(cbBody, url) 
    return d 


def cbBody(body, ourl): 
    global poweredby, server,ip 

    #print body 
    html_element = lxml.html.fromstring(body) 
    generator = html_element.xpath("//meta[@name='generator']/@content") 

    ip = socket.gethostbyname(ourl) 

    try: 
     match = geolite2.lookup(ip) 
     if match is not None: 
      country = match.country 
      try: 

       c = pycountry.countries.lookup(country) 
       country = c.name 
      except: 
       country = "" 

    except: 
     country = "" 
    try: 
     res = get_tld("http://www" + ourl, as_object=True) 
     tld = res.suffix 
    except: 
     tld = "" 

    try: 
     match = re.search(r'[\w\.-][email protected][\w\.-]+', body) 
     email = match.group(0) 
    except: 
     email = "" 

    permalink=ourl.rstrip().replace(".","-") 

    try: 
     item = generator[0] 
     val = "{ \"Domain\":" + json.dumps(
      "http://" + ourl.rstrip()) + ",\"IP\":\"" + ip + "\",\"Server\":" + json.dumps(
      str(server)) + ",\"PoweredBy\":" + json.dumps(
       str(poweredby)) + ",\"MetaGenerator\":" + json.dumps(item) + ",\"Email\":" + json.dumps(
        email) + ",\"Suffix\":\"" + tld + "\",\"CountryHosted\":\"" + country+"\",\"permalink\":\""+permalink+"\" }" 
    except: 
     val = "{ \"Domain\":" + json.dumps(
      "http://" + ourl.rstrip()) + ",\"IP\":\"" + ip + "\"," + "\"Server\":" + json.dumps(
      str(server)) + ",\"PoweredBy\":" + json.dumps(
       str(poweredby)) + ",\"MetaGenerator\":\"\",\"Email\":" + json.dumps(
        email) + ",\"Suffix\":\"" + tld + "\",\"CountryHosted\":\"" + country+"\",\"permalink\":\""+permalink+"\" }" 


    print val 

if __name__ == '__main__': 
    parser = argparse.ArgumentParser(description='Scanner v0.99') 
    parser.add_argument(
     '-i', '--input', help='Input list of domains', required=True) 
    args = parser.parse_args() 
    input = args.input 

with open(input) as f: 
    urls = f.read().splitlines() 


def mainjob(reactor, urls=urls): 
    for url in tqdm(urls): 
     agent = Agent(reactor) 
     d = agent.request(
      'GET', "http://" + url, 
      Headers({'User-Agent': ['bot']}), 
      None) 
     d.addCallback(cbRequest, url) 
     d.addErrback(lambda x: None) # ignore errors 
    return d 


react(mainjob, argv[3:]) 

更新1:

現在我執行這樣的:

file.txt的 - 12000000線

chunk01.txt - 文件, 1000行 。 。 。

我爲每個塊文件執行一個腳本。

python scanner.py chunk01.txt 
python scanner.py chunk02.txt 
. 
. 
. 

要執行一次腳本:

python scanner.py file.txt 

問題在於,我需要傳遞的URL作爲參數傳遞給反應()。如果我將它讀入內存(通過f.read())作爲12,000,000個文件,它太大了。因此我分割了文件並在每個小文件上執行腳本。

希望現在更清楚...

更新2:

基於@讓 - 保羅·Calderone的答案,我做這個代碼。

看來工作,但是我由於在凸點:

180000迭代....我將承擔180000個結構域(從輸入文件中的每一行),該腳本僅印刷/輸出約35707行(條目)。我預計它會接近180,000 ...我知道一些域會超時。當我以「舊」的方式運行它時,它更一致,數字更接近,即輸入域的數量接近輸出文件中的輸出行。

那裏的代碼可以有「壞」嗎?有任何想法嗎?

python scanner.py > out.txt 

181668it [1:47:36, 4.82it/s] 

和計數行:

wc -l out.txt 
36840 out.txt 

scanner.py

import argparse 

from tqdm import tqdm 
from sys import argv 
from pprint import pformat 

from twisted.internet.task import react 
from twisted.web.client import Agent, readBody 
from twisted.web.http_headers import Headers 
from twisted.internet.task import cooperate 
from twisted.internet.defer import gatherResults 

import lxml.html 

from geoip import geolite2 
import pycountry 

from tld import get_tld 
import json 
import socket 

poweredby = "" 
server = "" 
ip = "" 


def cbRequest(response, url): 
    global poweredby, server, ip 
    # print 'Response version:', response.version 
    # print 'Response code:', response.code 
    # print 'Response phrase:', response.phrase 
    # print 'Response headers:' 
    # print pformat(list(response.headers.getAllRawHeaders())) 
    poweredby = response.headers.getRawHeaders("X-Powered-By")[0] 
    server = response.headers.getRawHeaders("Server")[0] 

    #print poweredby 
    #print server 

    d = readBody(response) 
    d.addCallback(cbBody, url) 
    return d 


def cbBody(body, ourl): 
    global poweredby, server,ip 

    #print body 
    html_element = lxml.html.fromstring(body) 
    generator = html_element.xpath("//meta[@name='generator']/@content") 

    ip = socket.gethostbyname(ourl) 

    try: 
     match = geolite2.lookup(ip) 
     if match is not None: 
      country = match.country 
      try: 

       c = pycountry.countries.lookup(country) 
       country = c.name 
      except: 
       country = "" 

    except: 
     country = "" 
    try: 
     res = get_tld("http://www" + ourl, as_object=True) 
     tld = res.suffix 
    except: 
     tld = "" 

    try: 
     match = re.search(r'[\w\.-][email protected][\w\.-]+', body) 
     email = match.group(0) 
    except: 
     email = "" 

    permalink=ourl.rstrip().replace(".","-") 

    try: 
     item = generator[0] 
     val = "{ \"Domain\":" + json.dumps(
      "http://" + ourl.rstrip()) + ",\"IP\":\"" + ip + "\",\"Server\":" + json.dumps(
      str(server)) + ",\"PoweredBy\":" + json.dumps(
       str(poweredby)) + ",\"MetaGenerator\":" + json.dumps(item) + ",\"Email\":" + json.dumps(
        email) + ",\"Suffix\":\"" + tld + "\",\"CountryHosted\":\"" + country+"\",\"permalink\":\""+permalink+"\" }" 
    except: 
     val = "{ \"Domain\":" + json.dumps(
      "http://" + ourl.rstrip()) + ",\"IP\":\"" + ip + "\"," + "\"Server\":" + json.dumps(
      str(server)) + ",\"PoweredBy\":" + json.dumps(
       str(poweredby)) + ",\"MetaGenerator\":\"\",\"Email\":" + json.dumps(
        email) + ",\"Suffix\":\"" + tld + "\",\"CountryHosted\":\"" + country+"\",\"permalink\":\""+permalink+"\" }" 


    print val 


def main(reactor, url_path): 
    urls = open(url_path) 
    return mainjob(reactor, (url.strip() for url in urls)) 

def mainjob(reactor, urls=argv[2:]): 
    #for url in urls: 
    # print url 
    agent = Agent(reactor) 
    work = (process(agent, url) for url in tqdm(urls)) 
    tasks = list(cooperate(work) for i in range(100)) 
    return gatherResults(list(task.whenDone() for task in tasks)) 



def process(agent, url): 
    d = agent.request(
     'GET', "http://" + url, 
     Headers({'User-Agent': ['bot']}), 
     None) 
    d.addCallback(cbRequest, url) 
    d.addErrback(lambda x: None) # ignore errors 
    return d 

react(main, ["./domains.txt"]) 

更新3:

更新代碼打印錯誤的錯誤。TXT

import argparse 

from tqdm import tqdm 
from sys import argv 
from pprint import pformat 

from twisted.internet.task import react 
from twisted.web.client import Agent, readBody 
from twisted.web.http_headers import Headers 
from twisted.internet.task import cooperate 
from twisted.internet.defer import gatherResults 

import lxml.html 

from geoip import geolite2 
import pycountry 

from tld import get_tld 
import json 
import socket 

poweredby = "" 
server = "" 
ip = "" 

f = open("errors.txt", "w") 


def error(response, url): 
    f.write("Error: "+url+"\n") 


def cbRequest(response, url): 
    global poweredby, server, ip 
    # print 'Response version:', response.version 
    # print 'Response code:', response.code 
    # print 'Response phrase:', response.phrase 
    # print 'Response headers:' 
    # print pformat(list(response.headers.getAllRawHeaders())) 
    poweredby = response.headers.getRawHeaders("X-Powered-By")[0] 
    server = response.headers.getRawHeaders("Server")[0] 

    #print poweredby 
    #print server 

    d = readBody(response) 
    d.addCallback(cbBody, url) 
    return d 


def cbBody(body, ourl): 
    global poweredby, server,ip 

    #print body 
    html_element = lxml.html.fromstring(body) 
    generator = html_element.xpath("//meta[@name='generator']/@content") 

    ip = socket.gethostbyname(ourl) 

    try: 
     match = geolite2.lookup(ip) 
     if match is not None: 
      country = match.country 
      try: 

       c = pycountry.countries.lookup(country) 
       country = c.name 
      except: 
       country = "" 

    except: 
     country = "" 
    try: 
     res = get_tld("http://www" + ourl, as_object=True) 
     tld = res.suffix 
    except: 
     tld = "" 

    try: 
     match = re.search(r'[\w\.-][email protected][\w\.-]+', body) 
     email = match.group(0) 
    except: 
     email = "" 

    permalink=ourl.rstrip().replace(".","-") 

    try: 
     item = generator[0] 
     val = "{ \"Domain\":" + json.dumps(
      "http://" + ourl.rstrip()) + ",\"IP\":\"" + ip + "\",\"Server\":" + json.dumps(
      str(server)) + ",\"PoweredBy\":" + json.dumps(
       str(poweredby)) + ",\"MetaGenerator\":" + json.dumps(item) + ",\"Email\":" + json.dumps(
        email) + ",\"Suffix\":\"" + tld + "\",\"CountryHosted\":\"" + country+"\",\"permalink\":\""+permalink+"\" }" 
    except: 
     val = "{ \"Domain\":" + json.dumps(
      "http://" + ourl.rstrip()) + ",\"IP\":\"" + ip + "\"," + "\"Server\":" + json.dumps(
      str(server)) + ",\"PoweredBy\":" + json.dumps(
       str(poweredby)) + ",\"MetaGenerator\":\"\",\"Email\":" + json.dumps(
        email) + ",\"Suffix\":\"" + tld + "\",\"CountryHosted\":\"" + country+"\",\"permalink\":\""+permalink+"\" }" 


    print val 


def main(reactor, url_path): 
    urls = open(url_path) 
    return mainjob(reactor, (url.strip() for url in urls)) 

def mainjob(reactor, urls=argv[2:]): 
    #for url in urls: 
    # print url 
    agent = Agent(reactor) 
    work = (process(agent, url) for url in tqdm(urls)) 
    tasks = list(cooperate(work) for i in range(100)) 
    return gatherResults(list(task.whenDone() for task in tasks)) 



def process(agent, url): 
    d = agent.request(
     'GET', "http://" + url, 
     Headers({'User-Agent': ['crawler']}), 
     None) 
    d.addCallback(cbRequest, url) 
    d.addErrback(error, url) 
    return d 

react(main, ["./domains.txt"]) 

f.close() 

更新4:

我捕獲與Wireshark的交通,只需用2個域,這些域以前出錯:

[email protected]:~/crawler$ python scanner.py 
2it [00:00, 840.71it/s] 
[email protected]:~/crawler$ cat errors.txt 
Error: google.al 
Error: fau.edu.al 

正如你可以看到他們有錯誤,但與Wireshark我看到的迴應:

enter image description here

+1

你應該像這樣處理你的文件:「for line in f」。 f.read()會將整個文件讀入內存。看看這裏:http://stackoverflow.com/a/8010133/1904113 – MKesper

+0

謝謝!我意識到這一點,但我需要將它作爲參數(url)傳遞給twisted(react())。逐行閱讀不會使程序前進到這一點,它將停止閱讀文件。新的反應和Python。 –

+0

我不太明白你想要新代碼做什麼。 「在一個文件中處理1000個項目時,你失去了我」。你可以試着重申這個目標嗎? –

回答

1

您需要爲程序創建的併發量添加限制。目前,您處理,同時給予所有URL - 或嘗試,至少:

def mainjob(reactor, urls=urls): 
    for url in tqdm(urls): 
     agent = Agent(reactor) 
     d = agent.request(
      'GET', "http://" + url, 
      Headers({'User-Agent': ['bot']}), 
      None) 
     d.addCallback(cbRequest, url) 
     d.addErrback(lambda x: None) # ignore errors 
    return d 

這個問題對於每個URL的請求,而無需等待任何人來完成。相反,請使用twisted.internet.task.cooperate一次運行有限的號碼。這一次運行一個請求:

def mainjob(reactor, urls): 
    agent = Agent(reactor) 
    work = (process(agent, url) for url in tqdm(urls)) 
    task = cooperate(work) 
    return task.whenDone() 

def process(agent, url): 
    d = agent.request(
     'GET', "http://" + url, 
     Headers({'User-Agent': ['bot']}), 
     None) 
    d.addCallback(cbRequest, url) 
    d.addErrback(lambda x: None) # ignore errors 
    return d 

您可能想要更多。因此,請再次調用()幾次:

def mainjob(reactor, urls=urls): 
    agent = Agent(reactor) 
    work = (process(agent, url) for url in tqdm(urls)) 
    tasks = list(cooperate(work) for i in range(100)) 
    return gatherResults(list(task.whenDone() for task in tasks)) 

一次最多運行100個請求。每個任務從work中拉取下一個元素並等待它。 gatherResults等待所有100個任務完成。

現在只要避免一次裝入完整的輸入到內存:

def main(reactor, url_path): 
    urls = open(url_path) 
    return mainjob(reactor, (url.strip() for url in urls)) 

react(main, ["path-to-urls.txt"]) 

這將打開URL文件,但只有在需要時自動讀取其行。

+0

THAAAAAAAANKS !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!完美的答案! –

+0

嗯..我根據你的回答代碼運行了一段時間,但它的表現很奇怪......我會假設輸入量接近輸出量(域中,應該產生出應該與行接近的文件數字到文件中的域,有些會超時等)。我在更新2中描述了它:@ Jean-Paul Calderone你知道爲什麼嗎? –

+0

一個好的步驟就是讓它可以計算失敗。代碼現在就把它們扔掉了。如果您計算失敗,您可以查看成功+失敗的總數是否正確,然後繼續查看失敗情況以查看是否有改進的地方。如果總數仍然很小,您可以查看驅動程序,瞭解它在哪裏/如何下降工作。 –