2016-07-12 63 views
1

之後緩存我有我repartitined基於節點我需要repartitining

val config=new SparkConf().setAppName("MyHbaseLoader").setMaster("local[10]") 
val context=new SparkContext(config) 
val sqlContext=new SQLContext(context) 
val rows="sender,time,time(utc),reason,context-uuid,rat,cell-id,first-pkt,last-pkt,protocol,sub-proto,application-id,server-ip,server-domain-name, http-proxy-ip,http-proxy-domain-name, video,packets-dw, packets-ul, bytes-dw, bytes-ul" 
val scheme= new StructType(rows.split(",").map(e=>new StructField(e.trim,StringType,true))) 
val dFrame=sqlContext.read 
    .schema(scheme) 
    .format("csv") 
    .load("E:\\Users\\Mehdi\\Downloads\\ProbDocument\\ProbDocument\\ggsn_cdr.csv") 

dFrame.registerTempTable("GSSN") 
dFrame.persist(StorageLevel.MEMORY_AND_DISK) 

val distincCount=sqlContext.sql("select count(distinct sender) as SENDERS from GSSN").collectAsList().get(0).get(0).asInstanceOf[Long] 
dFrame.repartition(distincCount.toInt/3,dFrame("sender")) 

我需要重新分區對數據幀下一減少工作後再次打電話給我presist方法在其主鍵MADDE一個dataaframe?

回答

0

是的,repartition返回一個新的數據幀,所以你需要cache它再次。

0

雖然由Dikei提供的答案似乎解決您的直接問題需要注意的是,在這樣的情況下,重要的是通常沒有理由在所有明確的緩存。

在星火每洗牌(這裏是repartition)作爲隱式緩存點。如果血統的某些部分必須被重新執行,並且沒有一個執行者丟失了,那麼它不會比最後一次洗牌和閱讀洗牌文件更遠。

這意味着,緩存之前或剛過洗牌通常的時間和資源的浪費,特別是如果你不感興趣的內存僅或者一些非標準的緩存機制。

+0

糾正我,如果我錯了,但我認爲在洗牌過程中沒有緩存,只有地圖階段的結果放到磁盤。如果它們在OS緩存中,則從這些文件重新計算ShuffleRDD是快速的,但不能保證。 – Dikei

+0

@Dikei你沒有錯,但它不影響我的意見。如果OP從一開始就接受寫入磁盤的操作,那麼將兩次相同的數據放到磁盤上是非常浪費的。既然緩存既不是免費的也不是保證的,我在這裏沒有看到任何這種冗餘的理由。它只是在內存中,或者是堆外的,也許......但我可能是錯的。你認爲這裏真的有意義嗎? – zero323

+0

我認爲這取決於程序的其餘部分。 – Dikei

0

您將需要持續的賠償數據幀,因爲DataFrames是不變的,賠償返回一個新的數據幀。

,你可以遵循的做法是堅持DFRAME及其修復後返回新的數據幀是dFrameRepart。在這個階段,如果你不再使用dFrame,你可以堅持dFrameRepart並取消dFrame以釋放內存。如果在修復操作後使用dFrame,則兩個DataFrame都可以保留。

dFrame.registerTempTable( 「GSSN」) dFrame.persist(StorageLevel.MEMORY_AND_DISK)

VAL distincCount = sqlContext.sql( 「SELECT COUNT(DISTINCT發送者)從GSSN SENDERS」)。collectAsList()。得到(0)獲得(0).asInstanceOf [長]

valdFrameRepart = dFrame.repartition(distincCount.toInt/3,DFRAME( 「發件人」))。持續(StorageLevel.MEMORY_AND_DISK) dFrame.unpersist