2017-02-09 84 views
4

我處於想停止/取消代碼中的flink作業的情況。這是在我的集成測試中,我將任務提交給我的flink作業並檢查結果。由於作業是異步運行的,即使測試失敗/通過也不會停止。測試結束後,我想要停止工作。從代碼中取消Apache Flink作業

我想這我下面列舉幾件事情:

  1. 獲取jobmanager演員
  2. 獲取運行作業
  3. 對於每個正在運行的作業,發送取消請求jobmanager

這當然在沒有運行,但我不確定jobmanager actorref是否錯誤或其他東西丟失。

我得到的錯誤是:[flink-akka.actor.default-dispatcher-5] [akka:// flink/user/jobmanager_1] Message [org.apache.flink.runtime.messages.JobManagerMessages $ RequestRunningJobsStatus $ ]從演員[akka:// flink/temp/$ a]到演員[akka:// flink/user/jobmanager_1]未送達。 [1]遇到了死信。此日誌記錄可以關閉或通過配置設置'akka.log-dead-letters'和'akka.log-dead-letters-during-shutdown'進行調整。

這意味着作業管理器參與者錯誤或者發送給它的消息不正確。

的代碼如下所示:

val system = ActorSystem("flink", ConfigFactory.load.getConfig("akka")) //I debugged to get this path 
val jobManager = system.actorSelection("/user/jobmanager_1") //also got this akka path by debugging and getting the jobmanager akka url 
val responseRunningJobs = Patterns.ask(jobManager, JobManagerMessages.getRequestRunningJobsStatus, new FiniteDuration(10000, TimeUnit.MILLISECONDS)) 
    try { 
     val result = Await.result(responseRunningJobs, new FiniteDuration(5000, TimeUnit.MILLISECONDS)) 
     if(result.isInstanceOf[RunningJobsStatus]){ 
     val runningJobs = result.asInstanceOf[RunningJobsStatus].getStatusMessages() 
     val itr = runningJobs.iterator() 
     while(itr.hasNext){ 
      val jobId = itr.next().getJobId 
      val killResponse = Patterns.ask(jobManager, new CancelJob(jobId), new Timeout(new FiniteDuration(2000, TimeUnit.MILLISECONDS))); 
      try { 
      Await.result(killResponse, new FiniteDuration(2000, TimeUnit.MILLISECONDS)) 
      } 
      catch { 
      case e : Exception =>"Canceling the job with ID " + jobId + " failed." + e 
      } 

     } 
     } 
    } 
    catch{ 
     case e : Exception => "Could not retrieve running jobs from the JobManager." + e 
    } 

    } 

可有人檢查,如果這是正確的做法?

編輯: 要完全停止作業,必須先按照TaskManager的順序先停止TaskManager,再按JobManager,然後再按JobManager。

回答

2

您正在創建新的ActorSystem,然後嘗試在同一個演員系統中找到名稱爲/user/jobmanager_1的演員。這將不起作用,因爲實際的工作經理將運行在不同的ActorSystem

如果你想獲得ActorRef給真實的工作經理,你必須使用相同的ActorSystem進行選擇(那麼你可以使用本地地址),或者你已經找到了作業管理器參與者的遠程地址。遠程地址格式爲akka.tcp://[email protected][address_of_actor_system]/user/jobmanager_[instance_number]。如果您有權訪問FlinkMiniCluster,則可以使用leaderGateway承諾獲取當前領導者的ActorGateway

+0

嗨直到,你指出的是正確的。有沒有辦法獲得jobmanager的遠程地址?我無法訪問FlinkMiniCluster,因爲它是由環境直接創建的。我想停止作爲集成測試一部分的工作,現在我已經抽象出了創建FlinkMiniCluster的代碼,以便我可以控制它並停止它。 – Tej

+1

目前,Flink API不提供這樣的功能。但是我們正在努力通過引入一個'JobClient'來改善這個問題,您可以更明確地控制正在運行的作業。到目前爲止,我擔心創建自己的'FlinkMiniCluster'並將集成測試作業提交給它是實現您所描述內容的最佳方式。 –

+0

非常感謝直到! – Tej