2013-04-18 189 views
17

我正在使用R包foreach()%dopar%並行執行長(〜天)計算。我希望能夠在其中一個產生錯誤的情況下停止整套計算。但是,我還沒有找到實現這一目標的方法,從文檔和各種論壇中,我發現沒有跡象表明這是可能的。特別是,break()不起作用,stop()只停止當前計算,而不是整個foreach循環。有沒有辦法擺脫foreach循環?

請注意,我不能使用簡單的for循環,因爲最終我想使用doRNG軟件包對其進行並行化。

這裏是我嘗試什麼的簡化的,可重複的版本(這裏顯示的序列與%do%,但我用doRNG%dopar%時有同樣的問題)。請注意,實際上我想要並行運行此循環的所有元素(此處爲10)。

library(foreach) 
myfunc <- function() { 
    x <- foreach(k = 1:10, .combine="cbind", .errorhandling="stop") %do% { 
    cat("Element ", k, "\n") 
    Sys.sleep(0.5) # just to show that stop does not cause exit from foreach 
    if(is.element(k, 2:6)) { 
     cat("Should stop\n") 
     stop("Has stopped") 
    } 
    k 
    } 
    return(x) 
} 
x <- myfunc() 
# stop() halts the processing of k=2:6, but it does not stop the foreach loop itself. 
# x is not returned. The execution produces the error message 
# Error in { : task 2 failed - "Has stopped" 

我想達成什麼是整個foreach循環可以立即在某種狀態中退出(在這裏,遇到stop()時)。

我發現無法通過foreach實現此目的。看起來我需要一種方法將消息發送到所有其他進程以使它們停止。

如果foreach不可能,有沒有人知道的替代品?我也試圖用parallel::mclapply來達到這個目的,但那也行不通。

> sessionInfo() 
R version 3.0.0 (2013-04-03) 
Platform: x86_64-apple-darwin10.8.0 (64-bit) 

locale: 
[1] C/UTF-8/C/C/C/C 

attached base packages: 
[1] stats  graphics grDevices utils  datasets methods base 

other attached packages: 
[1] foreach_1.4.0 

loaded via a namespace (and not attached): 
[1] codetools_0.2-8 compiler_3.0.0 iterators_1.0.6 
+0

難道不可以用'for'代替嗎? –

+0

不,因爲最終我想使用doRNG軟件包將其並行化。 (對不起,我在原始文章中沒有說清楚:我已經編輯它來明確這一點。) –

+3

根據您的其他意見,您可能希望讓每個子流程都可以設置「標誌」對象失敗,並使該對象可供所有子流程讀取。他們都必須有一些內部斷點或者等同的東西來定期檢查'旗幟'的價值,這樣他們都可以自行終止。 –

回答

2

這不是直接回答你的問題,但使用when()你可以儘量避免進入循環,如果滿足條件:

x <- foreach(k = 1:10, .combine="cbind", .errorhandling="stop") %:% 
    when(!is.element(k, 2:6)) %do% 
    { 
    cat("Element ", k, "\n") 
    Sys.sleep(0.5) 
    k 
    } 

編輯:

我忘了什麼事:我認爲這是通過設計,你不能停止foreach循環。如果並行運行循環,則每個循環都是獨立處理的,這意味着當您停止整個循環的k=2時,如果k=1的處理已終止或仍在運行,則無法預測。因此,使用when()條件可以給出確定性結果。

編輯2:考慮您的意見的另一種解決方案。

shouldStop <- FALSE 
x <- foreach(k = 1:10, .combine="cbind", .errorhandling="stop") %do% 
    { 
    if(!shouldStop){ 
     # put your time consuming code here 
     cat("Element ", k, "\n") 
     Sys.sleep(0.5) 
     shouldStop <- shouldStop || is.element(k, 2:6) 
     k 
    } 
    } 

使用該解決方案,同時停止條件爲真,其正在運行的進程仍在計算結束,但你避免所有即將到來的過程的時間消耗。

+0

問題是我只知道我是否想在循環中完成一些計算後退出循環。然而,這正是我想要與這個循環並行的這些計算。 (換句話說,條件本身是耗時的計算。) –

+0

編輯2是一個有用的建議,但我運行的方式是,要處理的事件的數量等於可用的CPU內核的數量(10- 50)。因此所有進程都是同時啓動的,並且沒有將來的進程可以避免啓動。現在,我必須等待所有這些完成,然後才能從stop()獲得錯誤消息。當我看到cat()(在我的文章中)生成的消息時,解決方法是手動殺死整個程序,但這是不切實際的,因爲這是一個長期運行(〜1天)並且運行在在遠程機器上的背景。 –

+0

這個信息改變了整個事情,應該在原文中提及。但是,我必須承認,在這種情況下,我的想法是有限的。您可以嘗試直接控制節點,例如使用'snow'包中的'clusterApply',並在完成具有所需結果的第一個作業時調用'stopCluster()'。但請注意,從slave進程調用'stopCluster()'不僅會帶來醜陋的錯誤。此外,結果不會返回給主人。也許其他人有一個想法如何可以傳遞結果? – Beasterfield

11

這聽起來像你想急躁版本的「停止」錯誤處理。您可以通過編寫自定義組合函數來實現該功能,並在返回每個結果後立即調用它。要做到這一點,你需要:

  • 使用後端,它支持呼籲即時combine,像doMPIdoRedis
  • 不要啓用.multicombine
  • 設置.inorderFALSE
  • 設置.init (如NULL

下面是一個例子, :

library(foreach) 
parfun <- function(errval, n) { 
    abortable <- function(errfun) { 
    comb <- function(x, y) { 
     if (inherits(y, 'error')) { 
     warning('This will leave your parallel backend in an inconsistent state') 
     errfun(y) 
     } 
     c(x, y) 
    } 
    foreach(i=seq_len(n), .errorhandling='pass', .export='errval', 
      .combine='comb', .inorder=FALSE, .init=NULL) %dopar% { 
     if (i == errval) 
     stop('testing abort') 
     Sys.sleep(10) 
     i 
    } 
    } 
    callCC(abortable) 
} 

請注意,我還設置了錯誤處理「傳遞」,所以foreach將調用一個錯誤的對象結合功能。無論在foreach和後端中使用的錯誤處理如何,callCC函數都用於從foreach循環返回。在這種情況下,callCC將調用abortable函數,並將函數對象傳遞給force以立即返回。通過從組合函數中調用該函數,當我們檢測到錯誤對象時,我們可以從foreach循環中退出,並且callCC返回該對象。有關更多信息,請參閱?callCC

您可以實際使用parfun沒有註冊的並行後端並驗證foreach循環「斷裂」,只要它執行的是拋出一個錯誤的任務,但因爲任務是順序執行的,可能需要一段時間。例如,這需要20秒,如果沒有後臺註冊到執行:

print(system.time(parfun(3, 4))) 

當並行執行parfun,我們需要做更多的不是簡單地跳出foreach循環:我們還需要停止的工人,否則他們將繼續計算他們分配的任務。隨着doMPI,工人可以使用mpi.abort停止:

library(doMPI) 
cl <- startMPIcluster() 
registerDoMPI(cl) 
r <- parfun(getDoParWorkers(), getDoParWorkers()) 
if (inherits(r, 'error')) { 
    cat(sprintf('Caught error: %s\n', conditionMessage(r))) 
    mpi.abort(cl$comm) 
} 

注意羣集對象不能使用循環中止後,因爲事情並沒有正確地清理,這就是爲什麼正常的「一站式」錯誤處理不能以這種方式工作。

+0

對評論+1,對你的書幫助了我很多:) – statquant

+0

如果在聯合功能comb()中不再是停止(),我有點困惑於什麼導致了早期退出。我認爲foreach中的stop()觸發了comb()的調用。那麼是errfun()會導致提前退出嗎?但是errfun()是什麼?它沒有明確定義(並且名稱是任意的)。另外,當我在4個內核上運行帶有%dopar%和doMPI的parfun(6,12)時,執行繼續爲i = 5,7,8,9(在下面的答案中使用sink()方法驗證),所以我我不確定它在平行運行時是否真的停止了。 –

+0

「foreach」循環中的「stop」只會導致將錯誤對象作爲任務結果返回給主服務器。由於錯誤處理是「通過」,所以「foreach」將它傳遞給組合函數,並且由於指定的選項而立即執行。如果組合函數調用errfun,則組合函數不會返回給調用者,而是返回到'callCC'。但正如我在修改後的答案中所說的,這對工人沒有任何影響,這就是爲什麼需要mpi.abort。 –

-1

史蒂夫韋斯頓的原始答案基本上回答了這個問題。但是他的回答略有修改,它也以我需要的方式保留了兩個附加功能:(1)隨機數生成; (2)打印運行時診斷。

suppressMessages(library(doMPI)) 

comb <- function(x, y) { 
    if(inherits(y, 'error')) { 
    stop(y) 
    } 
    rbind(x, y) # forces the row names to be 'y' 
} 

myfunc <- function() { 
    writeLines(text="foreach log", con="log.txt") 
    foreach(i=1:12, .errorhandling='pass', .combine='comb', .inorder=FALSE, .init=NULL) %dopar% { 
    set.seed(100) 
    sink("log.txt", append=TRUE) 
    if(i==6) { 
     stop('testing abort') 
    } 
    Sys.sleep(10) 
    cat("Completed task", i, "\n") 
    sink(NULL) 
    rnorm(5,mean=i) 
    } 
} 

myerr <- function(e) { 
    cat(sprintf('Caught error: %s\n', conditionMessage(e))) 
    mpi.abort(cl$comm) 
} 

cl <- startMPIcluster(4) 
registerDoMPI(cl) 
r <- tryCatch(myfunc(), error=myerr) 
closeCluster(cl) 

當這個文件被採購,它離開作爲用於與一個錯誤消息

> source("exp2.R") 
    4 slaves are spawned successfully. 0 failed. 
Caught error: testing abort 
[ganges.local:16325] MPI_ABORT invoked on rank 0 in communicator with errorcode 0 

在「log.txt的」文件提供正確的診斷到錯誤的點,然後提供附加的錯誤信息。最重要的是,只要遇到foreach循環中的stop(),所有任務的執行都會暫停:它不會一直等到整個foreach循環完成。因此,我只能看到「完成的任務」消息,直到i = 4。 (請注意,如果Sys.sleep()更短,則後續任務可能會在mpi.abort()處理之前啓動。)

如果將停止條件更改爲「i == 100」,則停止,因此錯誤不會被觸發。代碼成功存在,沒有錯誤消息,r是尺寸爲12 * 5的二維數組。

順便說一句,我似乎並不需要.inorder = FALSE(我認爲只是在發現錯誤的情況下給我一個小的速度增加)。

+0

我改變了我的答案,因爲我發現它正在利用doMPI中的錯誤處理。在組合函數中執行'stop'不應該終止foreach,這在R-forge的doMPI開發版本中已經修復,所以當你的答案被釋放時,你的答案將不起作用。 –

+0

如果您沒有設置'.inorder = FALSE',那麼在組合函數處理完所有先前的任務之後纔會調用組合函數。因此,如果失敗的任務不是第一項任務,則在您的示例中至少需要10秒才能中止。 –

0

我沒有太多的運氣得到foreach做我想做的,所以這裏是一個解決方案,使用parallel包似乎做我想做的。我使用mcparallel()中的intermediate選項將結果從我的函數do.task()立即傳遞給函數check.res()。如果do.task()引發錯誤,則在check.res()中使用此值來觸發調用tools::pskill以顯式終止所有工作人員。這可能不是很優雅,但它的作用在於它可以立即停止所有工作。此外,我可以簡單地從當前環境繼承我在do.task()中處理所需的所有變量。 (在現實中do.task()是一個更復雜的功能需要許多變量的傳遞。)

library(parallel) 

# do.task() and check.res() inherit some variables from enclosing environment 

do.task <- function(x) { 
    cat("Starting task", x, "\n") 
    Sys.sleep(5*x) 
    if(x==stopat) { 
    stop("Error in job", x) # thrown to mccollect() which sends it to check.res() 
    } 
    cat(" Completed task", x, "\n") 
    return(10*x) 
} 

check.res <- function(r) { # r is list of results so far 
    cat("Called check.res\n") 
    sendKill <- FALSE 
    for(j in 1:Njob) { # check whether need to kill 
    if(inherits(r[[j]], 'try-error')) { 
     sendKill <- TRUE 
    } 
    } 
    if(sendKill) { # then kill all 
    for(j in 1:Njob) { 
     cat("Killing job", job[[j]]$pid, "\n") 
     tools::pskill(job[[j]]$pid) # mckill not accessible 
    } 
    } 
} 

Tstart <- Sys.time() 
stopat <- 3 
Njob <- 4 
job <- vector("list", length=Njob) 
for(j in 1:Njob) { 
    job[[j]]<- mcparallel(do.task(j)) 
} 
res <- mccollect(job, intermediate=check.res) # res is in order 1:Njob, regardless of how long jobs took 
cat("Collected\n") 
Tstop <- Sys.time() 
print(difftime(Tstop,Tstart)) 
for(j in 1:Njob) { 
    if(inherits(res[[j]], 'try-error')) { 
    stop("Parallel part encountered an error") 
    } 
} 

這給下面的屏幕轉儲和結果變量res

> source("exp5.R") 
Starting task 1 
Starting task 2 
Starting task 3 
Starting task 4 
    Completed task 1 
Called check.res 
Called check.res 
    Completed task 2 
Called check.res 
Called check.res 
Called check.res 
Killing job 21423 
Killing job 21424 
Killing job 21425 
Killing job 21426 
Called check.res 
Killing job 21423 
Killing job 21424 
Killing job 21425 
Killing job 21426 
Called check.res 
Killing job 21423 
Killing job 21424 
Killing job 21425 
Killing job 21426 
Collected 
Time difference of 15.03558 secs 
Error in eval(expr, envir, enclos) : Parallel part encountered an error 
> res 
$`21423` 
[1] 10 

$`21424` 
[1] 20 

$`21425` 
[1] "Error in do.task(j) : Error in job3\n" 
attr(,"class") 
[1] "try-error" 
attr(,"condition") 
<simpleError in do.task(j): Error in job3> 

$`21426` 
NULL 
1

我從革命得到的答覆技術支持:「目前沒有辦法停止所有並行計算中的任何錯誤。」