2017-06-05 59 views
1

我有一個非常簡單的火花流作業在獨立模式下本地運行。有一個客戶接收器從數據庫中讀取數據並將其傳遞給打印總數的主要作業。不是一個實際的用例,但我正在玩耍學習。問題在於工作被永久卡住,邏輯非常簡單,所以我認爲它既沒有做任何處理,也沒有記憶問題。奇怪的是,如果我停止工作,突然在日誌中看到作業執行的輸出和其他支持的作業!有人能幫我理解這裏發生了什麼嗎?Spark Streaming爲什麼只有在終止它時纔開始作業?

val spark = SparkSession 
    .builder() 
    .master("local[1]") 
    .appName("SocketStream") 
    .getOrCreate() 

val ssc = new StreamingContext(spark.sparkContext,Seconds(5)) 
val lines = ssc.receiverStream(new HanaCustomReceiver()) 


lines.foreachRDD{x => println("==============" + x.count())} 

ssc.start() 
ssc.awaitTermination() 

enter image description here

終止程序以下日誌卷其示出了批料的執行之後 -

17/06/05 15:56:16 INFO JobGenerator: Stopping JobGenerator immediately 
17/06/05 15:56:16 INFO RecurringTimer: Stopped timer for JobGenerator after time 1496696175000 
17/06/05 15:56:16 INFO JobGenerator: Stopped JobGenerator 
==============100 

回答

1

TL; DR使用local[2]起碼。

的問題是以下行:

.master("local[1]") 

你應該至少有2個線程使用於火花流或串流的工作接收機沒有得到甚至有機會開始因爲它們會被卡住等待資源,即免費線程分配。

引用星火流的A Quick Example

//主需要2個內核從飢餓的情況,以防止。

我的建議是使用local[2](最小值)或local[*]來取儘可能多的內核。最好的解決方案是使用像Apache Mesos,Hadoop YARN或Spark Standalone這樣的集羣管理器。

相關問題