2016-05-31 122 views
0

我們正在使用自定義的火花接收器從所提供的http鏈接中讀取流式數據。如果提供的http鏈接不正確,則接收器失敗。問題是spark會不斷重啓接收器,並且應用程序永遠不會終止。問題是如何告訴Spark在接收器失敗時終止應用程序。Spark Streaming:接收器故障後如何不重新啓動接收器

這是我們自定義的接收器的摘錄:

def onStart() { 
    // Start the thread that receives data over a connection 
    new Thread("Receiver") { 
     override def run() { receive() } 
    }.start() 
    } 

    private def receive(): Unit = { 
    .... 
    val response: CloseableHttpResponse = httpclient.execute(req) 
    try { 
     val sl = response.getStatusLine() 
     if (sl.getStatusCode != 200){ 
     val errorMsg = "Error: " + sl.getStatusCode 
     val thrw = new RuntimeException(errorMsg) 
     stop(errorMsg, thrw) 
     } else { 
     ... 
     store(doc) 
     } 

我們有一個火花流應用程序,使用該接收機:

val ssc = new StreamingContext(sparkConf, duration) 
val changes = ssc.receiverStream(new CustomReceiver(... 
... 
ssc.start() 
ssc.awaitTermination() 

一切正常,如果接收者沒有錯誤。如果接收器失敗(例如,使用錯誤的http鏈接),spark會不斷重啓,應用程序永遠不會終止。

16/05/31 17:03:38 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job 
16/05/31 17:03:38 ERROR ReceiverTracker: Receiver has been stopped. Try to restart it. 

我們只是想在接收器失敗時終止整個應用程序。

+0

不幸的是,現在不推薦使用 – Jake

回答

2

有一種方法可以控制基於自定義接收器的火花流應用程序的生命週期。爲您的應用程序定義作業進程偵聽器並跟蹤發生的事情。

class CustomReceiverListener extends StreamingJobProgressListener { 
    private boolean receiverStopped = false; 

    public CustomReceiverListener(StreamingContext ssc) { super(ssc);} 

    public boolean isReceiverStopped() { 
     return receiverStopped; 
    } 
    @Override 
    public void onReceiverStopped(StreamingListenerReceiverStopped receiverStopped) { 
     LOG.info("Update the flag field"); 
     this.receiverStopped = true; 
    } 
} 

而在你的驅動程序中,初始化一個線程來監視receiverStopped標誌的狀態。此線程完成後,驅動程序將停止流應用程序。 (更好的方法是定義由驅動程序定義的回調方法,這將停止流應用程序)。

CustomReceiverListener listener = new CustomReceiverListener(ssc); 
ssc.addStreamingListener(listener); 
ssc.start(); 
Thread thread = new Thread(() -> { 
    while (!listener.isReceiverStopped()) { 
     LOG.info("Sleepy head..."); 
     try { 
      Thread.sleep(2 * 1000); /*check after 2 seconds*/ 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
    } 
}); 
thread.start(); 
thread.join(); 
LOG.info("Listener asked to die! Going to commit suicide :("); 
ssc.stop(true, false); 

注:在你接收器的多個實例的情況下,改變CustomReceiverListener實施,以確保所有的接收器實例都停止了。

+0

我認爲這已被棄用 – Jake

1

我發現了一種方法,可以隨時從JVM中隨時停止啓動自定義接收器。它是原型的,但沒有經過壓力測試。它並沒有關閉接收器管理員等的層次結構,但是在自定義接收器重新啓動之前,它們基本上變爲空閒狀態。

似乎要尊重星火API

  • 創建一個擁有單HashMap對象的意圖。在地圖中保持自定義接收器及其所需的狀態(啓用或禁用)。

;;;

case class IKodaTextSocketReceiverStatus(receiver:IKodaTextSocketReceiver,enabled:Boolean) 
{ 

} 

object IKodaTextSocketReceiver extends Logging 
{ 
    val receiverMap:mutable.HashMap[String,IKodaTextSocketReceiverStatus]= mutable.HashMap[String,IKodaTextSocketReceiverStatus]() 


    def restartReceiver(recId:String):Boolean= 
    { 
    if(receiverMap.get(recId).isDefined) 
    { 
     logger.info("Found existing receiver") 

     receiverMap.put(recId,new IKodaTextSocketReceiverStatus(receiverMap.get(recId).get.receiver,true)) 
     receiverMap.get(recId).get.receiver.onStart() 
     true 

    } 
    else 
    { 
     false 
    } 
    } 

} 

啓用的布爾值可以從任何地方設置。從接收器的內部或外部。我從receive方法中設置。方法onStart檢查接收器的啓用狀態。如果是假的,它不會重新啓動一個新的線程接收方法.....並且有沉默:)

class IKodaTextSocketReceiver(host: String, port: Int, receiverId:String) 
    extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging { 

    IKodaTextSocketReceiver.receiverMap.put(receiverId,new IKodaTextSocketReceiverStatus(this,true)) 




    def isEnabled(): Boolean = 
    { 
    val bo = IKodaTextSocketReceiver.receiverMap.get(receiverId) 
    if(bo.isDefined) 
     { 
     bo.get.enabled 
     } 
    else 
     { 
     false 
     } 
    } 

    def onStart() { 
    // Start the thread that receives data over a connection 
    if(isEnabled()) 
    { 
     logger.info("Starting IKodaTextSocketReceiver") 
     if(!super.isStopped()) { 
     new Thread("IKodaTextSocketReceiver") { 
      override def run() { 
      receive() 
      } 
     }.start() 
     } 
     else 
     { 
      logger.info("Receiver disabled") 
     } 
    } 
    else 
     { 
     logger.warn("Restarting after stop set") 
     } 
    } 

    def onStop() { 
    // There is nothing much to do as the thread calling receive() 
    // is designed to stop by itself isStopped() returns false 

    } 



    /** Create a socket connection and receive data until receiver is stopped */ 
    private def receive() { 
    var socket: Socket = null 
    var userInput: String = null 
    var keepReceiving=true; 
    try { 
     logger.info(s"Connecting to $host : $port") 
     socket = new Socket(host, port) 
     logger.info(s"Connected to $host : $port") 
     val reader = new BufferedReader(
     new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)) 
     userInput = reader.readLine() 
     while(!isStopped && userInput != null) { 
     store(userInput) 
     userInput = reader.readLine() 
     if(userInput.contains(StreamingConstants.IKODA_END_STREAM)) 
      { 
      logger.info(s"${StreamingConstants.IKODA_END_STREAM}: Calling disabling receiver") 
      IKodaTextSocketReceiver.receiverMap.put(receiverId,new IKodaTextSocketReceiverStatus(this,false)) 
      //stop("Exiting, hopefully with elegance and dignity.") 
      } 
     } 
     reader.close() 
     socket.close() 
     logger.info("Stopped receiving") 
     restart("Trying to connect again",keepReceiving) 
    } catch { 
     case e: java.net.ConnectException => 
     restart(s"Error connecting to $host : $port"+e.getMessage,keepReceiving) 
     case t: Throwable => 
     restart("Error receiving data"+t.getMessage,keepReceiving) 
    } 
    } 

通過簡單地檢查它是否在HashMap註冊開始自定義接收器。如果不是,則創建一個新的。如果是,只需設置爲啓用。

 def doStream(ip: String, port: Int): Unit = { 

    try { 
     if(!IKodaTextSocketReceiver.restartReceiver("MyFirstReceiver")) 
     { 
     val ssc = getSparkStreamingContext(fieldVariables) 
     ssc.checkpoint("./ikoda/cp") 
     logger.info("open stream to " + ip + " " + port) 


     val ikReceiver = new IKodaTextSocketReceiver(ip, port, "MyFirstReceiver") 
     val lines: DStream[String] = ssc.receiverStream(ikReceiver) 


     etc etc etc 
相關問題