2016-04-06 29 views
7

我寫過一個方法,必須考慮一個隨機數來模擬伯努利分佈。我正在使用random.nextDouble生成0到1之間的數字,然後根據給定我的概率參數的值作出我的決定。Spark - Random Number Generation

我的問題是Spark在我for循環映射函數的每次迭代中都會生成相同的隨機數。我正在使用DataFrame API。我的代碼格式如下:

val myClass = new MyClass() 
val M = 3 
val myAppSeed = 91234 
val rand = new scala.util.Random(myAppSeed) 

for (m <- 1 to M) { 
    val newDF = sqlContext.createDataFrame(myDF 
    .map{row => RowFactory 
     .create(row.getString(0), 
     myClass.myMethod(row.getString(2), rand.nextDouble()) 
    }, myDF.schema) 
} 

這裏是類:

class myClass extends Serializable { 
    val q = qProb 

    def myMethod(s: String, rand: Double) = { 
    if (rand <= q) // do something 
    else // do something else 
    } 
} 

我需要一個新的隨機數,每次myMethod被調用。我也試過產生與java.util.Random我的方法裏數(scala.util.Random V10不延長Serializable)像下面,但我仍然得到在每一個同一個號碼循環

val r = new java.util.Random(s.hashCode.toLong) 
val rand = r.nextDouble() 

我做了一些研究,看起來這與Sparks的確定性本質有關。

回答

2

之所以相同序列重複是隨機生成器中創建,用數據劃分前的種子初始化。然後每個分區從相同的隨機種子開始。也許不是最有效的方式做到這一點,但下面應該工作:

val myClass = new MyClass() 
val M = 3 

for (m <- 1 to M) { 
    val newDF = sqlContext.createDataFrame(myDF 
    .map{ 
     val rand = scala.util.Random 
     row => RowFactory 
     .create(row.getString(0), 
     myClass.myMethod(row.getString(2), rand.nextDouble()) 
    }, myDF.schema) 
} 
+0

我修改這個稍微解決我的問題。我將Random val傳入我的方法,並從那裏生成隨機數。這解決了我的問題,但出於序列化原因,我不得不使用java.util.Random'。 –

4

只需使用SQL函數rand

import org.apache.spark.sql.functions._ 

//df: org.apache.spark.sql.DataFrame = [key: int] 

df.select($"key", rand() as "rand").show 
+---+-------------------+ 
|key|    rand| 
+---+-------------------+ 
| 1| 0.8635073400704648| 
| 2| 0.6870153659986652| 
| 3|0.18998048357873532| 
+---+-------------------+ 


df.select($"key", rand() as "rand").show 
+---+------------------+ 
|key|    rand| 
+---+------------------+ 
| 1|0.3422484248879837| 
| 2|0.2301384925817671| 
| 3|0.6959421970071372| 
+---+------------------+ 
+0

這並沒有完全解決我的問題,但其優雅的解決方案,我可能會被使用在未來,所以+1 –

2

this post,最好的辦法是不要把new scala.util.Random地圖裏面,也不是完全外(即在驅動程序代碼。 ),但在中間mapPartitionsWithIndex

import scala.util.Random 
val myAppSeed = 91234 
val newRDD = myRDD.mapPartitionsWithIndex { (indx, iter) => 
    val rand = new scala.util.Random(indx+myAppSeed) 
    iter.map(x => (x, Array.fill(10)(rand.nextDouble))) 
}