我的要求是處理股票市場的小時數據。 即,每個數據流間隔從源獲取一次數據,並通過DStream進行處理。如何設置Spark流接收器頻率?
我已經實現了一個自定義的接收器,通過實現onStart()和onStop()方法及其工作來廢棄/監視網站。遇到
挑戰:
- 接收機線程連續即獲取數據,倍數次,每次間隔。
- 無法協調接收器和DStream執行時間間隔。
選項我嘗試:
- 接收機主題到幾秒休眠(等於流間隔)。 在這種情況下,數據不是處理過程中的最新數據。
class CustomReceiver(interval: Int)
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
def onStart() {
new Thread("Website Scrapper") {
override def run() { receive() }
}.start()
}
def onStop() {
}
/** Create a socket connection and receive data until receiver is stopped */
private def receive() {
println("Entering receive:" + new Date());
try {
while (!isStopped) {
val scriptsLTP = StockMarket.getLiveStockData()
for ((script, ltp) <- scriptsLTP) {
store(script + "," + ltp)
}
println("sent data")
System.out.println("going to sleep:" + new Date());
Thread.sleep(3600 * 1000);
System.out.println("awaken from sleep:" + new Date());
}
println("Stopped receiving")
restart("Trying to connect again")
} catch {
case t: Throwable =>
restart("Error receiving data", t)
}
println("Exiting receive:" + new Date());
}
}
如何使DSTREAM處理同步星火流接收器?
當流式傳輸時間間隔開始時會獲取數據是否是一個選項?根據您的觀點, – maasg