2017-06-21 26 views
0

假設我們有一個100個元素的整數數組。如何有效地計算一個數組的索引,其中累積和超過了Scala中的閾值--Spark?

val a = Array(312, 102, 95, 255, ...)

我想找到陣列,其中所述第一k+1元素的累計總和大於一定閾值的索引(比如k),但是用於第一k元件較少。

由於高編號的數組中的元素的,我所估計的下部和上部的索引,其中在所述k指數之間應該是:

k_Lower <= k <= k_upper

我的問題是,什麼是最好的方式找到這個k索引?

我用while loopk_lower = 30; k_upper = 47試過和threshold = 20000

var sum = 0 
var k = 30 
while (k <= 47 && sum <= 20000) { 
    sum = test.take(k).sum 
    k += 1 
} 

print(k-2) 

我得到正確的答案,但我敢肯定,有一個更有效或「斯卡拉十歲上下的」解決方案,這一點,我在斯卡拉真的很新。我也必須在Spark中實現這一點。

另一個問題是: 爲了優化查找k索引的方法中,我想用二進制搜索,其中minmaxk_lower,各自k_upper的。但是我嘗試實現這一點並不成功。我應該怎麼做?

我使用Scala的2.10.6和Spark 1.6.0

更新!

我以爲這種做法是對我的問題一個很好的解決方案,但現在我認爲,我走近它錯誤。我實際的問題是:

我有一堆JSON-S,這是基於裝入星火作爲RDD與

val eachJson = sc.textFile("JSON_Folder/*.json")

我要拆分的數據分成若干分區的尺寸。級聯的JSON-s大小應該低於閾值。我的想法是逐個瀏覽RDD,計算JSON的大小並將其添加到累加器中。當累加器是大於閾值,那麼我刪除最後一個JSON和我取得了新的RDD所有的JSON-S,直到這個問題,我用剩下的JSON-S再次這樣做。我讀到這可能是一個解決方案尾遞歸,但我沒能實現它,所以我試圖以不同的方式解決這個問題。我爲每個JSON映射了大小,並且獲得了RDD [Int]。我設法得到這個數組,其中累計總和超過閾值的所有索引:

def calcRDDSize(rdd: RDD[String]): Long = { 
    rdd.map(_.getBytes("UTF-8").length.toLong) 
    .reduce(_ + _) //add the sizes together 
} 

val jsonSize = eachJson.map(s => s.getBytes("UTF-8").length) 
val threshold = 20000 
val totalSize = calcRDDSize(eachJson) 
val numberOfPartitions = totalSize/threshold 

val splitIndexes = scala.collection.mutable.ArrayBuffer.empty[Int] 
var i = 0 
while (i < numberOfPartitions) 
    { 
    splitIndexes += jsonSize.collect().toStream.scanLeft(0){_ + _}.takeWhile(_ < (i+1)*threshold).length-1 
    i = i + 1 
    } 

不過,我不喜歡這樣的解決方案,因爲在while循環我要經過好幾次的流這並不是很有效。現在我已經在那裏我有分裂RDD指標,但我不知道如何分割的。

+0

這是通過一些數據的線性掃描。 Spark將會是一個非常糟糕的比賽(你可能可以用mapPartitons做些什麼,但它仍然可能做比需要更多的工作)。爲什麼你必須使用Spark? –

+0

「由於陣列中元素的數量很多,我估計在k指數之間應該有一個較低和較高的指數:」。這有什麼幫助?您仍然需要將低於索引的所有元素進行總和。出於同樣的原因,二進制搜索沒有幫助。沒有必要爲您嘗試的每個索引重新計算總和。 –

+0

我必須使用Spark,因爲這是一項要求。我認爲這會優化一點,因爲我想計算每個指數的總和(我知道這是一個不好的解決方案),但我不想計算總和,比如前2個或前3個元素。但我同意你的看法,我不應該爲每個指數計算它。請注意,我更新了問題描述! – sanyi14ka

回答

1

我想這與scanLeft進一步優化該使用延遲集合

a 
.toStream 
.scanLeft(0){_ + _} 
.tail 
.zipWithIndex 
.find{case(cumsum,i) => cumsum > limit} 
+1

爲什麼'zipWIthIndex'?只需執行'a.toStrean.scanLeft(0){_ + _} .. tail.takeWhile(_

+0

@ TheArchetypalPaul謝謝,我添加了'.tail'。我知道使用'takeWhile'時''zipWithIndex'沒有必要,但我認爲這是我的解決方案更「說」 –

+0

我覺得很困惑。除了最後一個元素以外的任何元素的索引不需要爲什麼花時間和空間來添加它。注意你的解決方案產生一個'Option',而不是一個索引(當你的累積和總是不超過'k'時,你需要考慮做什麼) –