2017-06-12 33 views
1

我的要求是處理股票市場的小時數據。 即,每個數據流間隔從源獲取一次數據,並通過DStream進行處理。如何設置Spark流接收器頻率?

我已經實現了一個自定義的接收器,通過實現onStart()和onStop()方法及其工作來廢棄/監視網站。遇到

挑戰:

  • 接收機線程連續即獲取數據,倍數次,每次間隔。
  • 無法協調接收器和DStream執行時間間隔。

選項我嘗試:

  1. 接收機主題到幾秒休眠(等於流間隔)。 在這種情況下,數據不是處理過程中的最新數據。

enter image description here

enter image description here

class CustomReceiver(interval: Int) 
    extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) { 

    def onStart() { 
    new Thread("Website Scrapper") { 
     override def run() { receive() } 
    }.start() 
    } 

    def onStop() { 

    } 

    /** Create a socket connection and receive data until receiver is stopped */ 
    private def receive() { 
    println("Entering receive:" + new Date()); 
    try { 
     while (!isStopped) { 
     val scriptsLTP = StockMarket.getLiveStockData() 
     for ((script, ltp) <- scriptsLTP) { 
      store(script + "," + ltp) 
     } 
     println("sent data") 
     System.out.println("going to sleep:" + new Date()); 
     Thread.sleep(3600 * 1000); 
     System.out.println("awaken from sleep:" + new Date()); 
     } 
     println("Stopped receiving") 
     restart("Trying to connect again") 
    } catch { 
     case t: Throwable => 
     restart("Error receiving data", t) 
    } 
    println("Exiting receive:" + new Date()); 
    } 
} 

如何使DSTREAM處理同步星火流接收器?

+0

當流式傳輸時間間隔開始時會獲取數據是否是一個選項?根據您的觀點, – maasg

回答

0

這個用例似乎不適合Spark Streaming。間隔足夠長以將其視爲常規批處理作業。這樣,我們可以更好地利用羣集資源。

我會通過並行化目標代碼將其重寫爲Spark作業,使用mapPartitions將執行程序作爲分佈式Web抓取程序使用,然後按預期進行處理。

然後安排Spark作業每小時運行cron或更高級的替代品,例如Chronos在需要的確切時間。

+0

,流式作業的最大可行時間間隔是多少? –

+0

@VijayInnamuri,這取決於用例,所以我不會親愛的把一個數字。如果你不得不處理的數據不斷,但你只需要1個報告/小時,我會考慮Spark Streaming。這裏的要點是,Spark Streaming是關於** streaming **數據的,而您的usecase類似於需要在特定時間運行的批處理作業。另請注意,Spark Streaming將在運行時分配羣集資源。你的用例看起來像每小時處理幾分鐘。如果這種低使用率會不斷阻塞大量資源,這將是不利的, – maasg

+0

感謝您的澄清。 –

相關問題