之後緩存我有我repartitined基於節點我需要repartitining
val config=new SparkConf().setAppName("MyHbaseLoader").setMaster("local[10]")
val context=new SparkContext(config)
val sqlContext=new SQLContext(context)
val rows="sender,time,time(utc),reason,context-uuid,rat,cell-id,first-pkt,last-pkt,protocol,sub-proto,application-id,server-ip,server-domain-name, http-proxy-ip,http-proxy-domain-name, video,packets-dw, packets-ul, bytes-dw, bytes-ul"
val scheme= new StructType(rows.split(",").map(e=>new StructField(e.trim,StringType,true)))
val dFrame=sqlContext.read
.schema(scheme)
.format("csv")
.load("E:\\Users\\Mehdi\\Downloads\\ProbDocument\\ProbDocument\\ggsn_cdr.csv")
dFrame.registerTempTable("GSSN")
dFrame.persist(StorageLevel.MEMORY_AND_DISK)
val distincCount=sqlContext.sql("select count(distinct sender) as SENDERS from GSSN").collectAsList().get(0).get(0).asInstanceOf[Long]
dFrame.repartition(distincCount.toInt/3,dFrame("sender"))
我需要重新分區對數據幀下一減少工作後再次打電話給我presist方法在其主鍵MADDE一個dataaframe?
糾正我,如果我錯了,但我認爲在洗牌過程中沒有緩存,只有地圖階段的結果放到磁盤。如果它們在OS緩存中,則從這些文件重新計算ShuffleRDD是快速的,但不能保證。 – Dikei
@Dikei你沒有錯,但它不影響我的意見。如果OP從一開始就接受寫入磁盤的操作,那麼將兩次相同的數據放到磁盤上是非常浪費的。既然緩存既不是免費的也不是保證的,我在這裏沒有看到任何這種冗餘的理由。它只是在內存中,或者是堆外的,也許......但我可能是錯的。你認爲這裏真的有意義嗎? – zero323
我認爲這取決於程序的其餘部分。 – Dikei