我處於想停止/取消代碼中的flink作業的情況。這是在我的集成測試中,我將任務提交給我的flink作業並檢查結果。由於作業是異步運行的,即使測試失敗/通過也不會停止。測試結束後,我想要停止工作。從代碼中取消Apache Flink作業
我想這我下面列舉幾件事情:
- 獲取jobmanager演員
- 獲取運行作業
- 對於每個正在運行的作業,發送取消請求jobmanager
這當然在沒有運行,但我不確定jobmanager actorref是否錯誤或其他東西丟失。
我得到的錯誤是:[flink-akka.actor.default-dispatcher-5] [akka:// flink/user/jobmanager_1] Message [org.apache.flink.runtime.messages.JobManagerMessages $ RequestRunningJobsStatus $ ]從演員[akka:// flink/temp/$ a]到演員[akka:// flink/user/jobmanager_1]未送達。 [1]遇到了死信。此日誌記錄可以關閉或通過配置設置'akka.log-dead-letters'和'akka.log-dead-letters-during-shutdown'進行調整。
這意味着作業管理器參與者錯誤或者發送給它的消息不正確。
的代碼如下所示:
val system = ActorSystem("flink", ConfigFactory.load.getConfig("akka")) //I debugged to get this path
val jobManager = system.actorSelection("/user/jobmanager_1") //also got this akka path by debugging and getting the jobmanager akka url
val responseRunningJobs = Patterns.ask(jobManager, JobManagerMessages.getRequestRunningJobsStatus, new FiniteDuration(10000, TimeUnit.MILLISECONDS))
try {
val result = Await.result(responseRunningJobs, new FiniteDuration(5000, TimeUnit.MILLISECONDS))
if(result.isInstanceOf[RunningJobsStatus]){
val runningJobs = result.asInstanceOf[RunningJobsStatus].getStatusMessages()
val itr = runningJobs.iterator()
while(itr.hasNext){
val jobId = itr.next().getJobId
val killResponse = Patterns.ask(jobManager, new CancelJob(jobId), new Timeout(new FiniteDuration(2000, TimeUnit.MILLISECONDS)));
try {
Await.result(killResponse, new FiniteDuration(2000, TimeUnit.MILLISECONDS))
}
catch {
case e : Exception =>"Canceling the job with ID " + jobId + " failed." + e
}
}
}
}
catch{
case e : Exception => "Could not retrieve running jobs from the JobManager." + e
}
}
可有人檢查,如果這是正確的做法?
編輯: 要完全停止作業,必須先按照TaskManager的順序先停止TaskManager,再按JobManager,然後再按JobManager。
嗨直到,你指出的是正確的。有沒有辦法獲得jobmanager的遠程地址?我無法訪問FlinkMiniCluster,因爲它是由環境直接創建的。我想停止作爲集成測試一部分的工作,現在我已經抽象出了創建FlinkMiniCluster的代碼,以便我可以控制它並停止它。 – Tej
目前,Flink API不提供這樣的功能。但是我們正在努力通過引入一個'JobClient'來改善這個問題,您可以更明確地控制正在運行的作業。到目前爲止,我擔心創建自己的'FlinkMiniCluster'並將集成測試作業提交給它是實現您所描述內容的最佳方式。 –
非常感謝直到! – Tej