2017-08-09 16 views
0

我與Apache火花火花殼工作不正確的輸出,當我執行SQL查詢,我得到不同的輸出火花給予了一定的價值和正確的輸出對於一些價值

我必須有一個主設備和一個從火花集羣節點。在主人身上,有一名工人,在從屬節點上有一名工人。所以我總共有兩個工人節點。

現在,當我堅持一些數據,然後在每次獲得兩個不同輸出時對該持久數據執行一些過濾器,但這兩個不同的輸出是截然不同的,它們不會每次都改變,總之,我得到兩個不同的輸出爲一個SQL查詢。

我在主從節點上都有MySQL數據庫,在這個節點上我有一張表,它有50000條記錄,它也在主服務器上,主服務器上也有50000條記錄,這50k + 50k條記錄不一樣。

所以當我查詢結果變得不同。這裏是我正在嘗試的代碼,也是輸出的截圖。

spark-shell --conf spark.sql.warehouse.dir=C:\spark-warehouse --master spark://192.168.0.31:7077 

val jdbcDF = spark.read.format("jdbc").options(Map("url" -> "jdbc:mysql://localhost:3306/cmanalytics?zeroDateTimeBehavior=convertToNull&user=root&password=root", "dbtable" -> "cmanalytics.demo_call", "fetchSize" -> "1000", "partitionColumn" -> "newpartition", "lowerBound" -> "0", "upperBound" -> "4", "numPartitions" -> "4")).load() 

jdbcDF.createOrReplaceTempView("demo_call") 

val sqlDF = sql("select * from demo_call").persist(org.apache.spark.storage.StorageLevel.DISK_ONLY) 

sqlDF.show() 

val d = sqlDF.filter(sqlDF("campaign_id")===141).groupBy("classification_id").count 

d.count 

和輸出的屏幕截圖是

enter image description here

任何一個可以幫助解決這個問題?

感謝

回答

1

正如你可能知道,星火確實懶惰的評價,在這裏你的問題很簡單,你認爲show將迫使你DataFrame的評價,但假設是錯誤的。 show沒有這樣的保證,並且很可能只評估行的子集。爲了強制對整個DataFrame進行評估,您需要首先調用一個動作,如count

val sqlDF = sql("sql("select count(*) from demo_call where classification_id = 141").persist(org.apache.spark.storage.StorageLevel.DISK_ONLY)").persist(org.apache.spark.storage.StorageLevel.DISK_ONLY) 

sqlDF.count // to force evaluation 

show每次調用應該給你相同的結果從現在開始

+0

這意味着datafrme.show不評估整個數據幀的權利? –

+0

是的,這是正確的:) –

+0

謝謝,但我在兩臺機器上做了那些我在那臺機器上的數據,有些數據丟失了,所以我把數據放在兩邊,並嘗試做同樣的事情,有用。 –