2017-01-16 42 views
0

問題:我需要控制執行順序,在這個順序中任務由foreach循環並行處理。不幸的是,這不被foreach支持。R - doRedis - 覆蓋getTask來控制並行foreach循環中的執行順序

解決方法記住:使用doRedis使用數據庫來保存所有在foreach循環中執行的任務。爲了控制順序,我想通過setGetTask覆蓋getTask來獲取基於預先指定順序的任務。雖然我找不到有關如何做到這一點的文檔。

其它信息:

  1. 有上setGetTask小段落與所述redis documentation一個例子。

    getTask <- function (queue , job_id , ...) 
    { 
    
        key <- sprintf(" 
        redisEval("local x=redis.call('hkeys',KEYS[1])[1]; 
           if x==nil then return nil end; 
           local ans=redis.call('hget',KEYS[1],x); 
           redis.call('hdel',KEYS[1],x);i 
           return ans",key) 
    } 
    
    setGetTask(getTask) 
    

    我雖然覺得文檔中的代碼在語法上是不正確的(缺少恕我直言,一個「和右括號‘)’)。我認爲這是不可能的CRAN,作爲執行對文檔的代碼在提交。

  2. 改變getTask功能的工人越來越任務(的方面不會改變任何東西,即使引入明顯的非感到redisEval喜歡它更改爲redisEval(「DDDDDDDDDD(((」)

  3. 從sourc安裝包後,我只能訪問setGetTask函數E(這是我從official CRAN package page of version 1.1.1下載(這恕我直言,應該不是直接從CRAN安裝它沒有區別)

數據:任務執行的數據框看起來如下:

taskName;taskQueuePosition;parameter1;paramterN 
taskT;1;val1;10 
taskK;2;val2;8 
taskP;3;val3;7 
taskA;4;val4;7 

我想用'taskQueuePosition'來控制順序,應該先執行數字較小的任務。

問題:

  1. 是否有人知道的任何資源在那裏我可以得到與doRedis或setGetTask這樣做的更多信息?
  2. 有誰知道我需要如何改變getTask來實現上述?
  3. 任何其他聰明的想法來控制foreach循環中的執行順序?優選地,使得在某些時候,我可以使用doRedis作爲並行後端(由於複雜的技術基礎設施原因,改變這將意味着處理中的重大改變)。

代碼(爲了便於重現):

以下假定Redis的服務器開始在本地機器上。

Redis的DB灌裝:

library(doRedis) 
library(foreach) 

options('redis:num'=TRUE) # needed for proper execution 

REDIS_JOB_QUEUE = "jobs" 
registerDoRedis(REDIS_JOB_QUEUE) 

# filling up the data frame 
taskDF = data.frame(taskName=c("taskT","taskK","taskP","taskA"), 
      taskQueuePosition=c(1,2,3,4), 
      parameter1=c("val1","val2","val3","val4"), 
      parameterN=c(10,8,7,7)) 

foreach(currTask=iter(taskDF, by='row'), 
     .verbose = T 
) %dopar% { 
    print(paste("Executing task: ",currTask$taskName)) 
    Sys.sleep(currTask$parameterN) 
} 

removeQueue(REDIS_JOB_QUEUE) 

工人:

library(doRedis) 
REDIS_JOB_QUEUE = "jobs" 

startLocalWorkers(n=1, queue=REDIS_JOB_QUEUE) 

回答

0

我可以解決這個問題,現在可以控制任務的執行順序。

其他信息:

似乎是在文檔中一個錯字,呈現了getTask例如不工作。通過考慮從包中的文件task.R的default_getTask函數的形式,它或許應該是這個樣子:

getTaskDefault <- function (queue , job_id , ...) 
{ 
    key <- sprintf("%s:%s",queue, job_id) 
    return(redisEval("local x=redis.call('hkeys',KEYS[1])[1]; 
        if x==nil then return nil end; 
        local ans=redis.call('hget',KEYS[1],x); 
        redis.call('set', KEYS[1] .. '.start.' .. x, x); 
        redis.call('hdel',KEYS[1],x); 
        return ans",key)) 
} 

看來,在函數的第一線後面第一百分號字母迷路了。這將解釋括號和引號的數量不平衡。

2) setGetTask對我沒有任何影響。當我在填充數據庫時通過.option設置getTask函數(如在vignette of the package中描述的那樣)時,它被成功調用。

3)關於2)的信息表示我不需要getTask函數,所以我可以使用來自CRAN的包。

----- -----問題

1)的doRedis小品介紹瞭如何自定義getTask能夠建立成功。

2和3)當像下面那樣修改getTask函數中的LUA腳本時,任務將按照提交方式從數據庫中提取。這並不是我所要求的,但是由於時間的限制以及我有(或者更好的)這個事實不是關於LUA腳本的第一個想法,所以通過taskQueuePosition列來控制提交順序是一個令人滿意的解決方案。

getTaskInOrder <- function (queue , job_id , ...) 
{ 

    key <- sprintf("%s:%s",queue, job_id) 
    return(redisEval(" 

     local tasks=redis.call('hkeys',KEYS[1]); -- get all tasks 

     local x=tasks[1];   -- get first task available task 
     if x==nil then    -- if there are no tasks left, stop processing 
      return nil 
     end; 

     local xMin = 65535;   -- if we have more tasks than 65535, getting the 
     -- task with the lowest taskID is not guaranteed to be the first one 
     local i = 1; 
     -- local iMinFound = -1; 
     while (x ~= nil) do   -- search the array until there are no tasks left 
     -- print('x: ',x) 
     local xNum = tonumber(x); 
     if(xNum<xMin) then 
      xMin = xNum; 
      -- iMinFound = i; 
     end 
     i=i+1; 
     -- print('i is now: ',i); 
     x=tasks[i]; 
     end 
     -- print('Minimum is task number',xMin,' found at i ', iMinFound) 
     x=tostring(xMin)   -- convert it back to a string (maybe it would 
            -- be better to keep the original string somewhere, 
            -- in case we loose some information whilst converting to number) 

     -- print('x is now:',x); 
     -- print(KEYS[1] .. '.start.' .. x, x); 
     -- print(''); 
     local ans=redis.call('hget',KEYS[1],x); 
     redis.call('set', KEYS[1] .. '.start.' .. x, x); 
     redis.call('hdel',KEYS[1],x); 
     return ans",key)) 
} 

重要提示:我注意到,如果一個任務被中止,訂單搞砸了,並重新提交任務(即使任務數保持不變),將最初提交後執行任務。這對我來說沒問題。

------代碼(爲了便於再現):------

這導致下面的代碼示例(與在任務數據幀12個條目,而不是原來的4 ):

Redis的DB填料:

library(doRedis) 
library(foreach) 

options('redis:num'=TRUE) # needed for proper execution 

REDIS_JOB_QUEUE = "jobs" 

getTaskInOrder <- function (queue , job_id , ...) 
{ 
    ...like above 
} 

registerDoRedis(REDIS_JOB_QUEUE) 

# filling up the data frame already in order of tasks to be executed 
# otherwise the dataframe has to be sorted by taskQueuePosition 
taskDF = data.frame(taskName=c("taskA","taskB","taskC","taskD","taskE","taskF","taskG","taskH","taskI","taskJ","taskK","taskL"), 
     taskQueuePosition=c(1,2,3,4,5,6,7,8,9,10,11,12), 
     parameter1=c("val1","val2","val3","val4","val1","val2","val3","val4","val1","val2","val3","val4"), 
     parameterN=c(5,5,5,4,4,4,4,3,3,3,2,2)) 

foreach(currTask=iter(taskDF, by='row'), 
     .verbose = T, 
     .options.redis = list(getTask = getTaskInOrder 
) %dopar% { 
    print(paste("Executing task: ",currTask$taskName)) 
    Sys.sleep(currTask$parameterN) 
} 

removeQueue(REDIS_JOB_QUEUE) 

工人:

library(doRedis) 
REDIS_JOB_QUEUE = "jobs" 

startLocalWorkers(n=1, queue=REDIS_JOB_QUEUE) 

另注:以防萬一你正在處理長期的工作,因爲我做的,請儘管工人仍注意到a bug in redis 1.1.1,從而導致被重新提交任務(由於超時)(上CRAN當前版本)在他們身上工作。