2014-01-10 28 views
4

我想在發生第一個任務錯誤後停止執行我的async.queue。我需要在併發限制的同時執行幾個類似的操作,但在第一個錯誤之後停止所有的操作。我該怎麼做,或者我應該使用什麼?如何在第一次失敗後停止async.queue?

+0

有趣的問題。即使其他的「async」函數(例如'each')允許你這樣做,恐怕你不能用'async.queue'來完成。你可以在這裏查看'async.queue'代碼:https://github.com/caolan/async/blob/master/lib/async.js,你會看到沒有錯誤處理。我建議您檢查一下您的需求,因爲您可以使用'async.eachLimit'(https://github.com/caolan/async#eachlimitarr-limit-iterator-callback)替換''async.queue'並且擁有需要的功能,即在第一次失敗時停止執行。 – Tom

+0

如果它們同時執行,當其中一個返回錯誤時如何停止休息。 – user568109

回答

5

假設你發出了5個並行函數,每個函數都需要5秒。在3秒內,功能1失敗。那你怎麼能停止其餘的執行呢?

這取決於這些函數的作用,您可以使用setInterval進行輪詢。但是,如果你的問題是如何阻止進一步的任務被推入隊列。你可以這樣做:

q.push(tasks, function (err) { 
    if (err && !called) { 
     //Will prevent async to push more tasks to the queue, however please note that 
     //whatever pushed to the queue, it will be processed anyway. 
     q.kill(); 

     //This will not allow double calling for the final callback 
     called = true; 


     //This the main process callback, the final callback 
     main(err, results); 
    } 
    }); 

這裏一個完整的工作示例:GitHub的頁面上

var async = require('async'); 

/* 
    This function is the actual work you are trying to do. 
    Please note for example if you are running child processes 
    here, by doing q.kill you will not stop the execution 
    of those processes, so you need actually to keep track the 
    spawned processed and then kill them when you call q.kill 
    in 'pushCb' function. In-case of just long running function, 
    you may poll using setInterval 
*/ 
function worker(task, wcb) { 
    setTimeout(function workerTimeout() { 
    if (task === 11 || task === 12 || task === 3) { 
     return wcb('error in processing ' + task); 
    } 

    wcb(null, task + ' got processed'); 

    }, Math.floor(Math.random() * 100)); 
} 


/* 
    This function that will push the tasks to async.queue, 
    and then hand them to your worker function 
*/ 
function process(tasks, concurrency, pcb) { 
    var results = [], called = false; 

    var q = async.queue(function qWorker(task, qcb) { 

    worker(task, function wcb(err, data) { 
     if (err) { 
     return qcb(err); //Here how we propagate error to qcb 
     } 

     results.push(data); 

     qcb(); 
    }); 

    }, concurrency); 

/* 
    The trick is in this function, note that checking q.tasks.length 
    does not work q.kill introduced in async 0.7.0, it is just setting 
    the drain function to null and the tasks length to zero 
*/ 
    q.push(tasks, function qcb(err) { 
    if (err && !called) { 
     q.kill(); 
     called = true; 
     pcb(err, results); 
    } 
    }); 

    q.drain = function drainCb() { 
    pcb(null, results); 
    } 
} 

var tasks = []; 
var concurrency = 10; 

for (var i = 1; i <= 20; i += 1) { 
    tasks.push(i); 
} 

process(tasks, concurrency, function pcb(err, results) { 
    console.log(results); 

    if (err) { 
    return console.log(err); 
    } 

    console.log('done'); 
}); 
2

異步文檔過期或不正確,同時檢查()方法我通過async.queue返回隊列對象沒有看到方法kill()。

儘管如此,還是有一種解決方法。隊列對象具有屬性任務,這是一個數組,簡單地將一個引用賦給一個空數組對我來說確實是個訣竅。

queue.push(someTasks, function (err) { 

    if (err) queue.tasks = []; 

}); 
+0

是queue.drain()仍然在這裏調用?我想我正在尋找的是某種'終於'功能。我使用drain來確定隊列完成(因爲錯誤或沒有) – rynop

+0

我相信它可以很容易地測試......只需清空隊列上的任務數組,並在漏斗回調中執行調試日誌;)如果我我沒有弄錯,你應該調用漏斗回調 –

相關問題