1

我有以下的表存儲在蜂巢稱爲ExampleData:成羣Spark2數據幀/ RDD過程

+--------+-----+---| 
|Site_ID |Time |Age| 
+--------+-----+---| 
|1  |10:00| 20| 
|1  |11:00| 21| 
|2  |10:00| 24| 
|2  |11:00| 24| 
|2  |12:00| 20| 
|3  |11:00| 24| 
+--------+-----+---+ 

我需要能夠通過網站來處理數據。不幸的是,按網站劃分它不起作用(有超過10萬個網站,所有的數據量都很小)。

對於每一個網站,我需要單獨選擇時間列和年齡列,並用它來喂成一個功能(在理想情況下我想在執行人運行,而不是司機)

我已經得到了一個我認爲我想讓它工作的存根,但這個解決方案只能在驅動程序上運行,所以速度很慢。我需要找到寫它的一種方式,以便它會運行一個執行程序級別:

// fetch a list of distinct sites and return them to the driver 
//(if you don't, you won't be able to loop around them as they're not on the executors) 
val distinctSites = spark.sql("SELECT site_id FROM ExampleData GROUP BY site_id LIMIT 10") 
.collect 

val allSiteData = spark.sql("SELECT site_id, time, age FROM ExampleData") 

distinctSites.foreach(row => { 
    allSiteData.filter("site_id = " + row.get(0)) 
    val times = allSiteData.select("time").collect() 
    val ages = allSiteData.select("ages").collect() 
    processTimesAndAges(times, ages) 
}) 

def processTimesAndAges(times: Array[Row], ages: Array[Row]) { 
    // do some processing 
} 

我已經試過所有節點廣播distinctSites,但這並不能證明卓有成效。

這似乎是一個這麼簡單的概念,但我花了幾天的時間來研究這個。我對Scala/Spark很新,很抱歉,如果這是一個荒謬的問題!

任何意見或建議,非常感謝。

回答

1

RDD API提供了許多函數,可用於以低級別repartition/repartitionAndSortWithinPartitions開始並以多個* byKey方法(combineByKey,groupByKey,reduceByKey等)結尾的組中執行操作。

實施例:

rdd.map(tup => ((tup._1, tup._2, tup._3), tup)). 
    groupByKey(). 
    forEachPartition(iter => doSomeJob(iter)) 

在數據幀可以使用聚集函數,GroupedData類提供了許多爲最常見的功能,包括計數方法,最大值,最小值,平均值和總結

實施例:

val df = sc.parallelize(Seq(
     (1, 10.3, 10), (1, 11.5, 10), 
     (2, 12.6, 20), (3, 2.6, 30)) 
    ).toDF("Site_ID ", "Time ", "Age") 

df.show() 

+--------+-----+---+ 
|Site_ID |Time |Age| 
+--------+-----+---+ 
|  1| 10.3| 10| 
|  1| 11.5| 10| 
|  2| 12.6| 20| 
|  3| 2.6| 30| 
+--------+-----+---+ 


    df.groupBy($"Site_ID ").count.show 

+--------+-----+ 
|Site_ID |count| 
+--------+-----+ 
|  1| 2| 
|  3| 1| 
|  2| 1| 
+--------+-----+ 

注意:正如你所說的解決方案非常慢,你需要使用分區,在你的情況下,範圍分區是很好的選擇。

+0

謝謝!這是groupByKey()讓我到了我需要的地方。非常感謝,並感謝您的快速回復。 –