2014-06-20 175 views
10

我如何使用Node.js - 異步&請求模塊抓取100+百萬計的網站和我保持幾分鐘後碰撞到錯誤ESOCKETTIMEDOUT & ETIMEDOUTNode.js的GET請求ETIMEDOUT&ESOCKETTIMEDOUT

重新啓動腳本後,它再次工作。它似乎並不是連接限制問題,因爲我仍然可以毫不拖延地解決問題,解決問題,解決問題,也解決問題。

您是否看到任何代碼問題?或任何建議?我想推動async.queue()併發到至少1000.謝謝。

var request = require('request'), 
    async = require('async'), 
    mysql = require('mysql'), 
    dns = require('dns'), 
    url = require('url'), 
    cheerio = require('cheerio'), 
    iconv = require('iconv-lite'), 
    charset = require('charset'), 
    config = require('./spy.config'), 
    pool = mysql.createPool(config.db); 

iconv.skipDecodeWarning = true; 

var queue = async.queue(function (task, cb) { 
    dns.resolve4('www.' + task.domain, function (err, addresses) { 
     if (err) { 
      // 
      // Do something 
      // 
      setImmediate(function() { 
       cb() 
      }); 
     } else { 
      request({ 
       url: 'http://www.' + task.domain, 
       method: 'GET', 
       encoding:  'binary', 
       followRedirect: true, 
       pool:   false, 
       pool:   { maxSockets: 1000 }, 
       timeout:  15000 // 15 sec 
      }, function (error, response, body) { 

       //console.info(task); 

       if (!error) { 
        // If ok, do something 

       } else { 
        // If not ok, do these 

        console.log(error); 

        // It keeps erroring here after few minutes, resolve4, resolveNs, resolveMx still work here. 

        // { [Error: ETIMEDOUT] code: 'ETIMEDOUT' } 
        // { [Error: ESOCKETTIMEDOUT] code: 'ESOCKETTIMEDOUT' } 

        var ns = [], 
         ip = [], 
         mx = []; 
        async.parallel([ 
         function (callback) { 
          // Resolves the domain's name server records 
          dns.resolveNs(task.domain, function (err, addresses) { 
           if (!err) { 
            ns = addresses; 
           } 
           callback(); 
          }); 
         }, function (callback) { 
          // Resolves the domain's IPV4 addresses 
          dns.resolve4(task.domain, function (err, addresses) { 
           if (!err) { 
            ip = addresses; 
           } 
           callback(); 
          }); 
         }, function (callback) { 
          // Resolves the domain's MX records 
          dns.resolveMx(task.domain, function (err, addresses) { 
           if (!err) { 
            addresses.forEach(function (a) { 
             mx.push(a.exchange); 
            }); 
           } 
           callback(); 
          }); 
         } 
        ], function (err) { 
         if (err) return next(err); 

         // do something 
        }); 

       } 
       setImmediate(function() { 
        cb() 
       }); 
      }); 
     } 
    }); 
}, 200); 

// When the queue is emptied we want to check if we're done 
queue.drain = function() { 
    setImmediate(function() { 
     checkDone() 
    }); 
}; 
function consoleLog(msg) { 
    //console.info(msg); 
} 
function checkDone() { 
    if (queue.length() == 0) { 
     setImmediate(function() { 
      crawlQueue() 
     }); 
    } else { 
     console.log("checkDone() not zero"); 
    } 
} 

function query(sql) { 
    pool.getConnection(function (err, connection) { 
     if (!err) { 
      //console.log(sql); 
      connection.query(sql, function (err, results) { 
       connection.release(); 
      }); 
     } 
    }); 
} 

function crawlQueue() { 
    pool.getConnection(function (err, connection) { 
     if (!err) { 
      var sql = "SELECT * FROM domain last_update < (UNIX_TIMESTAMP() - 2592000) LIMIT 500"; 
      connection.query(sql, function (err, results) { 
       if (!err) { 
        if (results.length) { 
         for (var i = 0, len = results.length; i < len; ++i) { 
          queue.push({"id": results[i]['id'], "domain": results[i]['domain'] }); 
         } 
        } else { 
         process.exit(); 
        } 
        connection.release(); 
       } else { 
        connection.release(); 
        setImmediate(function() { 
         crawlQueue() 
        }); 
       } 
      }); 
     } else { 
      setImmediate(function() { 
       crawlQueue() 
      }); 
     } 
    }); 
} 
setImmediate(function() { 
    crawlQueue() 
}); 

並且系統限制相當高。

Limit      Soft Limit   Hard Limit   Units 
    Max cpu time    unlimited   unlimited   seconds 
    Max file size    unlimited   unlimited   bytes 
    Max data size    unlimited   unlimited   bytes 
    Max stack size   8388608    unlimited   bytes 
    Max core file size  0     unlimited   bytes 
    Max resident set   unlimited   unlimited   bytes 
    Max processes    257645    257645    processes 
    Max open files   500000    500000    files 
    Max locked memory   65536    65536    bytes 
    Max address space   unlimited   unlimited   bytes 
    Max file locks   unlimited   unlimited   locks 
    Max pending signals  257645    257645    signals 
    Max msgqueue size   819200    819200    bytes 
    Max nice priority   0     0 
    Max realtime priority  0     0 
    Max realtime timeout  unlimited   unlimited   us 

的sysctl

net.ipv4.ip_local_port_range = 10000 61000 
+0

爲什麼池(請求)設置兩次? – dandavis

+0

這是禁用池。我仍然得到錯誤,無論有沒有游泳池和maxSockets。 –

+7

你能找到原因嗎? – Cmag

回答

7

默認情況下,節點有4 workers to resolve DNS queries。如果您的DNS查詢需要很長時間,請求將在DNS階段阻止,並且症狀正好是ESOCKETTIMEDOUTETIMEDOUT

嘗試提高UV線程池的大小:使用TC從DNS服務器減慢響應

#!/usr/bin/env node 
process.env.UV_THREADPOOL_SIZE = 128; 

function main() { 
    ... 
} 

I reproduced this locally

export UV_THREADPOOL_SIZE=128 
node ... 

index.js(或任何你的切入點是)。

+0

我有同樣的問題,但是這種解決方法沒有工作爲了我。任何想法? –

+0

不適用於我。 – 3zzy

2

我有同樣的問題。在閱讀this discussion後,通過在請求選項中使用「agent:false」來解決。

10/31/2017 上述內容似乎並未完全解決問題。我們發現的最終解決方案是在代理中使用keepAlive選項。例如:

var pool = new https.Agent({ keepAlive: true }); 

function getJsonOptions(_url) { 
    return { 
     url: _url, 
     method: 'GET', 
     agent: pool, 
     json: true 
    }; 
} 

節點的默認池似乎默認爲keepAlive = false,這會導致在每個請求上創建新的連接。如果在短時間內創建了太多的連接,上述錯誤就會顯現。我的猜測是沿服務路徑的一個或多個路由器阻止連接請求,可能懷疑是拒絕服務攻擊。無論如何,上面的代碼示例完全解決了我們的問題。