假設我們有一個100個元素的整數數組。如何有效地計算一個數組的索引,其中累積和超過了Scala中的閾值--Spark?
val a = Array(312, 102, 95, 255, ...)
我想找到陣列,其中所述第一k+1
元素的累計總和大於一定閾值的索引(比如k
),但是用於第一k
元件較少。
由於高編號的數組中的元素的,我所估計的下部和上部的索引,其中在所述k
指數之間應該是:
k_Lower <= k <= k_upper
我的問題是,什麼是最好的方式找到這個k
索引?
我用while loop
時k_lower = 30; k_upper = 47
試過和threshold = 20000
var sum = 0
var k = 30
while (k <= 47 && sum <= 20000) {
sum = test.take(k).sum
k += 1
}
print(k-2)
我得到正確的答案,但我敢肯定,有一個更有效或「斯卡拉十歲上下的」解決方案,這一點,我在斯卡拉真的很新。我也必須在Spark中實現這一點。
另一個問題是: 爲了優化查找k
索引的方法中,我想用二進制搜索,其中min
和max
值k_lower
,各自k_upper
的。但是我嘗試實現這一點並不成功。我應該怎麼做?
我使用Scala的2.10.6和Spark 1.6.0
更新!
我以爲這種做法是對我的問題一個很好的解決方案,但現在我認爲,我走近它錯誤。我實際的問題是:
我有一堆JSON-S,這是基於裝入星火作爲RDD與
val eachJson = sc.textFile("JSON_Folder/*.json")
我要拆分的數據分成若干分區的尺寸。級聯的JSON-s大小應該低於閾值。我的想法是逐個瀏覽RDD,計算JSON的大小並將其添加到累加器中。當累加器是大於閾值,那麼我刪除最後一個JSON和我取得了新的RDD所有的JSON-S,直到這個問題,我用剩下的JSON-S再次這樣做。我讀到這可能是一個解決方案尾遞歸,但我沒能實現它,所以我試圖以不同的方式解決這個問題。我爲每個JSON映射了大小,並且獲得了RDD [Int]。我設法得到這個數組,其中累計總和超過閾值的所有索引:
def calcRDDSize(rdd: RDD[String]): Long = {
rdd.map(_.getBytes("UTF-8").length.toLong)
.reduce(_ + _) //add the sizes together
}
val jsonSize = eachJson.map(s => s.getBytes("UTF-8").length)
val threshold = 20000
val totalSize = calcRDDSize(eachJson)
val numberOfPartitions = totalSize/threshold
val splitIndexes = scala.collection.mutable.ArrayBuffer.empty[Int]
var i = 0
while (i < numberOfPartitions)
{
splitIndexes += jsonSize.collect().toStream.scanLeft(0){_ + _}.takeWhile(_ < (i+1)*threshold).length-1
i = i + 1
}
不過,我不喜歡這樣的解決方案,因爲在while循環我要經過好幾次的流這並不是很有效。現在我已經在那裏我有分裂RDD指標,但我不知道如何分割的。
這是通過一些數據的線性掃描。 Spark將會是一個非常糟糕的比賽(你可能可以用mapPartitons做些什麼,但它仍然可能做比需要更多的工作)。爲什麼你必須使用Spark? –
「由於陣列中元素的數量很多,我估計在k指數之間應該有一個較低和較高的指數:」。這有什麼幫助?您仍然需要將低於索引的所有元素進行總和。出於同樣的原因,二進制搜索沒有幫助。沒有必要爲您嘗試的每個索引重新計算總和。 –
我必須使用Spark,因爲這是一項要求。我認爲這會優化一點,因爲我想計算每個指數的總和(我知道這是一個不好的解決方案),但我不想計算總和,比如前2個或前3個元素。但我同意你的看法,我不應該爲每個指數計算它。請注意,我更新了問題描述! – sanyi14ka