2012-01-27 71 views
3

我希望jobs.create失敗,如果一個相同的工作已經在系統中。有什麼辦法可以實現這個嗎?kue for node.js的獨特工作

我需要每24小時運行一次相同的作業,但有些作業甚至可能需要24小時以上,所以我需要確保作業尚未在系統中(活動,排隊o失敗)添加它。

已更新: 好的,我將簡化問題,以便能夠在這裏解釋它。 唯恐我有一個分析服務,我必須每天向我的用戶發送一次報告。有時完成這些報告(只有少數情況,但有可能)需要幾個小時甚至一天以上。

我需要一種方法來知道哪些是當前正在運行的作業,以避免重複作業。我找不到'''''''''API中的任何內容來知道哪些作業正在運行。當需要更多工作時,我還需要某種事件,然後致電我的生產商getMoreJobs

也許我的方法是錯誤的,如果是這樣,請讓我知道一個更好的方法來解決我的問題。

這是我的簡化代碼:

var kue = require('kue'), 
    cluster = require('cluster'), 
    numCPUs = require('os').cpus().length; 

numCPUs = CONFIG.sync.workers || numCPUs; 

var jobs = kue.createQueue(); 

if (cluster.isMaster) { 
    console.log('Starting master pid:' + process.pid); 
    jobs.on('job complete', function(id){ 
    kue.Job.get(id, function(err, job){ 
     if (err || !job) return; 
     job.remove(function(err){ 
      if (err) throw err; 
      console.log('removed completed job #%d', job.id); 
     }); 
    }); 

    function getMoreJobs() { 
     console.log('looking for more jobs...'); 
     getOutdateReports(function (err, reports) { 
      if (err) return setTimeout(getMoreJobs, 5 * 60 * 60 * 1000); 

      reports.forEach(function(report) { 
       jobs.create('reports', { 
        id: report.id, 
        title: report.name, 
        params: report.params 
       }).attempts(5).save(); 
      }); 

      setTimeout(getMoreJobs, 60 * 60 * 1000); 
     }); 
    } 

    //Create the jobs 
    getMoreJobs(); 

    console.log('Starting ', numCPUs, ' workers'); 
    for (var i = 0; i < numCPUs; i++) { 
     cluster.fork(); 
    } 

    cluster.on('death', function(worker) { 
     console.log('worker pid:' + worker.pid + ' died!'.bold.red); 
    }); 

} else { 
    //Process the jobs 
    console.log('Starting worker pid:' + process.pid); 
    jobs.process('reports', 20, function(job, done){ 
     //completing my work here 
     veryHardWorkGeneratingReports(function(err) { 
      if (err) return done(err); 
      return done(); 
     }); 
    }); 
} 
+0

需要更多信息,代碼或其他... – Teemu 2012-01-27 11:45:32

+0

@Teemu我已更新我的問題,謝謝! – aartiles 2012-01-27 12:38:51

回答

2

https//github.com/LearnBoost/kue

在json.js腳本中檢查行64-112。在那裏你會找到返回一個包含作業的對象的方法,同時也用類型,狀態或id範圍進行過濾。 (jobRange()jobStateRange()jobTypeRange()

主要頁面向下滾動到JSON API -section,你會發現返回的對象的實例。

那如何調用和使用那些你知道比我更好的方法。

jobs.create()將失敗,如果您傳遞未知的關鍵字。我會創建一個函數來檢查forEach -loop中的當前作業,並返回一個關鍵字。然後在jobs.create()-參數中調用該函數而不是文字關鍵字。

通過json.js中的這些方法獲得的信息可以幫助您創建「moreJobToDo」事件。

+0

謝謝,這些方法對我有很大幫助 – aartiles 2012-01-28 09:34:40

3

你的問題之一的答案是,Kue將它從redis隊列中彈出的作業放入「active」中,除非你找到它們,否則你將永遠得不到它們。

另一個問題的答案是,您的分佈式工作隊列是消費者,而不是任務的生產者。像他們一樣盯着它們是可以的,但是,這是一個泥濘的範例。我用Kue所做的是爲kue的json api製作一個包裝器,以便可以從系統中的任何位置將作業放入隊列中。由於您似乎需要剷除工作,因此我建議編寫一個單獨的生產者應用程序,該應用程序除了獲得外部工作並將其粘貼到Kue工作隊列中之外什麼都不做。它可以監視工作隊列,以便在作業運行時間較短並加載批處理時執行,或者,我會做的就是儘可能快地剷除作業,並將多個客戶應用程序實例緩存以處理負載更快速。

重新迭代:您的問題分離在這裏不是很好。你應該有一個完全獨立於你的任務消費者應用程序的任務生產者。這爲您提供了更大的靈活性,易於擴展(只需啓動另一臺計算機上的另一位用戶,然後您就可以進行擴展),並且整體簡化了代碼管理。如果可能的話,您應該允許任何人向您提供這些任務,讓您「查找」訪問您的Kue服務器的JSON API,而不是外出找到它們。工作生產者可以通過Kue安排自己的任務。

+2

(是的,我知道我正在回答一個老問題,但我會投入資金去看SE的Q&A是否包含正確的答案) – 2012-05-08 22:28:10