我是Apache Spark的新手,正在學習基本功能。 有一個小小的懷疑。假設我有一個元組的RDD(鍵,值),並希望從它們中獲得一些獨特的元組。我使用distinct()函數。我想知道函數在什麼基礎上認爲元組是不同的..?它是基於鍵或值還是兩者?Distinct()函數在Spark中如何工作?
回答
爲RDD.distinct() API文檔只提供一個一句話描述:
「返回一個新的RDD包含此RDD的不同的元素。」
根據最近的經驗,我可以告訴你,在一個元組-RDD中,元組作爲一個整體被考慮。
如果你想不同的鍵或不同的值,然後根據你想要完成什麼,你可以:
A.呼叫groupByKey()
改造{(k1,v11),(k1,v12),(k2,v21),(k2,v22)}
到{(k1,[v11,v12]), (k2,[v21,v22])}
;或
B.通過調用keys()
或values()
其次distinct()
去掉無論是鍵或值到本文截稿時(2015年6月),加州大學伯克利分校+ EDX運行免費的在線課程Introduction to Big Data and Apache Spark這將在提供手練習這些功能。
distinct
使用hashCode
和equals
方法確定對象。元組內置了平等機制,將其分解爲每個對象的平等和位置。因此,distinct
將對整個Tuple2
對象起作用。正如Paul指出的那樣,您可以撥打keys
或values
,然後撥打distinct
。或者你可以通過aggregateByKey
編寫自己獨特的值,這將保持密鑰配對。或者如果你想要不同的密鑰,那麼你可以使用常規的aggregate
謝謝!說得通。 –
.distinct()肯定是在跨分區進行洗牌。要查看更多情況,請在RDD上運行.toDebugString。
val hashPart = new HashPartitioner(<number of partitions>)
val myRDDPreStep = <load some RDD>
val myRDD = myRDDPreStep.distinct.partitionBy(hashPart).setName("myRDD").persist(StorageLevel.MEMORY_AND_DISK_SER)
myRDD.checkpoint
println(myRDD.toDebugString)
其中,用於RDD例如我有(myRDDPreStep已經由密鑰散列分區,由StorageLevel.MEMORY_AND_DISK_SER依然存在,檢查點),則返回:
(2568) myRDD ShuffledRDD[11] at partitionBy at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
+-(2568) MapPartitionsRDD[10] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
| ShuffledRDD[9] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
+-(2568) MapPartitionsRDD[8] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
| myRDDPreStep ShuffledRDD[6] at partitionBy at mycode.scala:193 [Disk Memory Serialized 1x Replicated]
| CachedPartitions: 2568; MemorySize: 362.4 GB; TachyonSize: 0.0 B; DiskSize: 0.0 B
| myRDD[7] at count at mycode.scala:214 [Disk Memory Serialized 1x Replicated]
注意,有可能更有效特別是如果您的RDD已經以智能方式進行了分區並且分區不會過度傾斜,那麼您可以採取各種方法來減少混洗。
見Is there a way to rewrite Spark RDD distinct to use mapPartitions instead of distinct? 和 Apache Spark: What is the equivalent implementation of RDD.groupByKey() using RDD.aggregateByKey()?
它看起來像distinct
將擺脫(鍵,值)重複。
在下面的示例(1,20)和(2,20)在myRDD
中重複兩次,但在distinct()
之後,刪除了重複項。
scala> val myRDD = sc.parallelize(List((1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22)))
myRDD: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[1274] at parallelize at <console>:22
scala> myRDD.collect().foreach(println _)
(1,20)
(1,21)
(1,20)
(2,20)
(2,22)
(2,20)
(3,21)
(3,22)
scala> myRDD.distinct.collect().foreach(println _)
(2,22)
(1,20)
(3,22)
(2,20)
(1,21)
(3,21)
Justin Pihony是對的.Distinct使用hashCode和equals方法確定對象。它的返回不同元素(對象)
val rdd = sc.parallelize(List((1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22)))
鮮明
rdd.distinct.collect().foreach(println)
(2,22)
(1,20)
(3,22)
(2,20)
(1,21)
(3,21)
如果你想申請的關鍵區別。 在這種情況下減少是更好的選擇
ReduceBy
val reduceRDD= rdd.map(tup =>
(tup._1, tup)).reduceByKey { case (a, b) => a }.map(_._2)
reduceRDD.collect().foreach(println)
輸出: -
(2,20)
(1,20)
(3,21)
- 1. Distinct()如何工作?
- 2. Spark Streaming:mapWithState函數如何在集羣中工作?
- 3. Apache Spark - 異常處理如何在映射函數中工作
- 4. SELECT DISTINCT在MySQL中如何工作?
- 5. MySQL DISTINCT函數無法正常工作
- 6. DISTINCT如何在內部工作?
- 7. VectorSlicer如何在Spark 2.0中工作?
- 8. AWS lambda函數提交spark工作
- 9. DISTINCT不工作
- 10. spark sc.textfile如何工作?
- 11. 如何在Spark工作器上調試映射函數中的錯誤?
- 12. max()函數在Jython中如何工作?
- 13. dist函數在MATLAB中如何工作?
- 14. scanf函數在C中如何工作?
- 15. malloc函數如何在C中工作?
- 16. 在Swift中,forEach函數如何工作
- 17. rem函數如何在matlab中工作
- 18. scanf函數如何在C中工作?
- 19. LN()函數如何在Excel中工作?
- 20. EOF函數如何在c中工作?
- 21. prop.table()函數如何在r中工作?
- 22. SQL的DISTINCT子句如何工作?
- 23. spark的HiveContext如何在內部工作?
- 24. Spark:如何在Dataframe API中轉換count(distinct(value))
- 25. SELECT DISTINCT不工作
- 26. (Django)如何使用distinct()函數?
- 27. 函數內部函數如何工作?
- 28. recv函數如何工作-winsock函數
- 29. 循環在函數外工作,但在函數中不工作。
- 30. 箭頭函數如何在構造函數中工作?
保羅嗨!假設我們有一個RDD元組如下: (1,20),(1,21),(1,20),(2,20),(2,22),(2,20),( 3,21),(3,22)..等, 在這裏你可以觀察到鍵和值都在各種元組中重複。 因此,如果我在上述RDD上應用distinct(),結果是什麼? 請花點時間。謝謝! 而且,是的,我在線上課! :) –
我現在沒時間了,但是你可以用'myRDD = sc.parallelize([(1,20),(1,21),(1,20),(2,20) ),(2,22),(2,20),(3,21),(3,22)]);'這甚至可以在Spark課程的其中一個實驗室筆記本中工作。然後運行'myRDD.distinct()。collect()來測試輸出' – Paul