2015-10-15 28 views
1

我正在使用Nodejs作爲後端。我試過this npm包創建一個簡單的工作流程(AMAZON-SWF)。該軟件包有一個示例文件夾,其中包含我放在我的節點項目中的文件,以便我瞭解其工作原理。爲什麼descisionTask沒有從AWS-SWF服務(SWF)接收任何任務?

問題是Decider沒有收到來自SWF服務器的任何任務。因爲我的工作流程從未運行。是否有一些配置問題。請指出我所做的錯誤。

以下是用於快速參考的代碼。代碼唯一的變化就是版本號的變化和域名的變化。否則,它與代碼相同,您可以找到here

以下是決定代碼。

var swf = require('./index'); 

var myDecider = new swf.Decider({ 
    "domain": "test-domain", 
    "taskList": {"name": "my-workflow-tasklist"}, 
    "identity": "Decider-01", 
    "maximumPageSize": 100, 
    "reverseOrder": false // IMPORTANT: must replay events in the right order, ie. from the start 
}); 

myDecider.on('decisionTask', function (decisionTask) { 

    console.log("Got a new decision task !"); 

    if(!decisionTask.eventList.scheduled('step1')) { 
     decisionTask.response.schedule({ 
      name: 'step1', 
      activity: 'simple-activity' 
     }); 
    } 
    else { 
     decisionTask.response.stop({ 
      result: "some workflow output data" 
     }); 
    } 

    decisionTask.response.respondCompleted(decisionTask.response.decisions, function(err, result) { 

     if(err) { 
      console.log(err); 
      return; 
     } 

     console.log("responded with some data !"); 
    }); 

}); 

myDecider.on('poll', function(d) { 
    //console.log(_this.config.identity + ": polling for decision tasks..."); 
    console.log("polling for tasks...", d); 
}); 

// Start polling 
myDecider.start(); 



/** 
* It is not recommanded to stop the poller in the middle of a long-polling request, 
* because SWF might schedule an DecisionTask to this poller anyway, which will obviously timeout. 
* 
* The .stop() method will wait for the end of the current polling request, 
* eventually wait for a last decision execution, then stop properly : 
*/ 
process.on('SIGINT', function() { 
    console.log('Got SIGINT ! Stopping decider poller after this request...please wait...'); 
    myDecider.stop(); 
}); 

以下是活動碼:

/** 
* This simple worker example will respond to any incoming task 
* on the 'my-workflow-tasklist, by setting the input parameters as the results of the task 
*/ 

var swf = require('./index'); 

var activityPoller = new swf.ActivityPoller({ 
    domain: 'test-domain-newspecies', 
    taskList: { name: 'my-workflow-tasklist' }, 
    identity: 'simple-activity' 
}); 

activityPoller.on('error',function() { 
    console.log('error'); 
}); 

activityPoller.on('activityTask', function(task) { 
    console.log("Received new activity task !"); 
    var output = task.input; 

    task.respondCompleted(output, function (err) { 

     if(err) { 
      console.log(err); 
      return; 
     } 

     console.log("responded with some data !"); 
    }); 
}); 


activityPoller.on('poll', function(d) { 
    console.log("polling for activity tasks...", d); 
}); 

activityPoller.on('error', function(error) { 
    console.log(error); 
}); 


// Start polling 
activityPoller.start(); 


/** 
* It is not recommanded to stop the poller in the middle of a long-polling request, 
* because SWF might schedule an ActivityTask to this poller anyway, which will obviously timeout. 
* 
* The .stop() method will wait for the end of the current polling request, 
* eventually wait for a last activity execution, then stop properly : 
*/ 
process.on('SIGINT', function() { 
    console.log('Got SIGINT ! Stopping activity poller after this request...please wait...'); 
    activityPoller.stop(); 
}); 

以下是哪些寄存器的代碼:

var awsswf = require('./index'); 
var swf = awsswf.createClient(); 
/** 
* Register the domain "test-domain" 
*/ 
swf.registerDomain({ 
    name: "test-domain-newspecies", 
    description: "this is a just a test domain", 
    workflowExecutionRetentionPeriodInDays: "3" 
}, function (err, results) { 

    if (err && err.code != 'DomainAlreadyExistsFault') { 
     console.log("Unable to register domain: ", err); 
     return; 
    } 
    console.log("'test-domain-newspecies' registered !") 


    /** 
    * Register the WorkflowType "simple-workflow" 
    */ 
    swf.registerWorkflowType({ 
     domain: "test-domain-newspecies", 
     name: "simple-workflow", 
     version: "2.0" 
    }, function (err, results) { 

     if (err && err.code != 'TypeAlreadyExistsFault') { 
      console.log("Unable to register workflow: ", err); 
      return; 
     } 
     console.log("'simple-workflow' registered !") 

     /** 
     * Register the ActivityType "simple-activity" 
     */ 
     swf.registerActivityType({ 
      domain: "test-domain-newspecies", 
      name: "simple-activity", 
      version: "2.0" 
     }, function (err, results) { 

      if (err && err.code != 'TypeAlreadyExistsFault') { 
       console.log("Unable to register activity type: ", err); 
       return; 
      } 

      console.log("'simple-activity' registered !"); 
     }); 

    }); 

}); 

以下是其中啓動工作流執行的代碼:

var swf = require('./index'); 

var workflow = new swf.Workflow({ 
    "domain": "test-domain-newspecies", 
    "workflowType": { 
     "name": "simple-workflow", 
     "version": "2.0" 
    }, 
    "taskList": { "name": "my-workflow-tasklist" }, 
    "executionStartToCloseTimeout": "1800", 
    "taskStartToCloseTimeout": "1800", 
    "tagList": ["example"], 
    "childPolicy": "TERMINATE" 
}); 


var workflowExecution = workflow.start({ input: "any data ..."}, function (err, runId) { 

    if (err) { console.log("Cannot start workflow : ", err); return; } 

    console.log("Workflow started, runId: " +runId); 

}); 

以下是index.js文件

var basePath = "../node_modules/aws-swf/lib/"; 
exports.AWS = require('aws-swf').AWS; 
exports.AWS.config.loadFromPath(__dirname + '/../config/awsConfig.json'); 
exports.createClient = require(basePath+"swf").createClient; 
exports.Workflow = require(basePath+"workflow").Workflow; 
exports.WorkflowExecution = require(basePath+"workflow-execution").WorkflowExecution; 
exports.ActivityPoller = require(basePath+"activity-poller").ActivityPoller; 
exports.ActivityTask = require(basePath+"activity-task").ActivityTask; 
exports.Decider = require(basePath+"decider").Decider; 
exports.DecisionTask = require(basePath+"decision-task").DecisionTask; 
exports.EventList = require(basePath+"event-list").EventList; 
exports.DecisionResponse = require(basePath+"decision-response").DecisionResponse; 
exports.Poller = require(basePath+"poller").Poller; 

運行此代碼的方式是同時打開三個終端。然後我在各自的終端執行以下命令。

activity 
node <activity-file-name> 

decider 
node <decider-file-name> 

start and register I run in the same terminal. 
node <register-file-name> 
node <start-file-name> 

回答

2

它代表的是在決勝局您使用"test-domain",但在代碼的其餘部分使用的是"test-domain-newspecies"

如果域"test-domain"未註冊,則在輪詢決策任務時應得到UnknownResourceFault錯誤。

+1

順便說一句,這個問題在SWF中很常見 - 你經常有不同的進程,必須保持配置在所有的配置中同步。 – Kobi

+0

這解決了我的決策任務的問題 - 非常感謝。我仍然不知道爲什麼決策任務不能開始簡單活動。當我登錄AWS控制檯時,我得到這個「ScheduleActivityTaskFailed」,因爲它正在調用其版本爲1.0的活動(現在已棄用)。它應該在2.0版本中被稱爲活動。我在哪裏提到它應該使用2.0。 **我不明白的是,SWF如何知道要調用哪個版本,因爲活動輪詢器和決策者輪詢器都不會讓您提及該版本。** – nsp

+0

想通了。代碼中的一些錯誤。採取靜態版本1.0 http://neyric.github.io/aws-swf/apidoc/decision-response.js.html行號。 169 – nsp

相關問題