2015-11-23 57 views
2

(什麼是Scala中應該儘可能在Java中是不可能的,對不對?但我會採取斯卡拉的建議爲好)構建星火JavaRDD列表從DropResult對象

我不是要遍歷一個RDD,而是我需要用稱爲DropResult的類型的隨機/模擬器類中的n個元素構建一個。 DropResult不能被轉換成其他任何東西。

我以爲Spark「find PI」的例子讓我走上正軌,但沒有運氣。這裏就是我想:

在一次性基礎上DropResult作出這樣的: 使從PLD單DropResult(PipeLinkageData)

DropResult dropResultSeed = pld.doDrop(); 

我想是這樣的:

JavaRDD<DropResult> simCountRDD = spark.parallelize(makeRangeList(1, getSimCount())).foreach(pld.doDrop()); 

我只需要在集羣上運行pld.doDrop()大約10^6次,並將結果放入Spark RDD中,以便在集羣上執行下一個操作。我無法弄清楚使用「並行化」來完成這項工作的功能。

makeRangeList:

private List<Integer> makeRangeList(int lower, int upper) { 
    List<Integer> range = IntStream.range(lower, upper).boxed().collect(Collectors.toList()); 
    return range;  
} 

(FWIW我試圖用從http://spark.apache.org/examples.html爲如何做一個for循環創建JavaRDD模型郫縣爲例)

int count = spark.parallelize(makeRange(1, NUM_SAMPLES)).filter(new Function<Integer, Boolean>() { 
    public Boolean call(Integer i) { 
    double x = Math.random(); 
    double y = Math.random(); 
    return x*x + y*y < 1; 
    } 
}).count(); 
System.out.println("Pi is roughly " + 4 * count/NUM_SAMPLES); 
+0

問題關鍵是,我們試圖並行首先填充RDD的過程中,我們無法在一臺機器上運行這麼多計算,因爲它們每個都需要大約六秒。感謝任何想法,Spark必須能夠做到這一點? – JimLohse

回答

1

是啊,似乎就像你應該能夠很容易地做到這一點。聽起來你只需要簡單地並行化一個10^6整數的RDD,這樣你就可以在RDD中創建10^6個DropResult對象。

如果是這種情況,我認爲你不需要像上面那樣明確地創建一個列表。好像你就應該能夠使用makeRange()星火丕例子確實是這樣的方式:

JavaRDD<DropResult> simCountRDD = spark.parallelize(makeRange(1,getSimCount())).map(new Function<Integer, DropResult>() 
{ 
    public DropResult call(Integer i) { 
    return pld.doDrop(); 
    } 
}); 
+0

我看到你在做什麼,我現在正在測試它,我無法弄清楚的一件事是makeRange是否來自一個庫? makeRange需要返回一個數組,我想,看看Accumulator示例http://spark.apache.org/docs/latest/programming-guide.html ...謝謝我會盡快讓你知道 – JimLohse

+0

我認爲給定函數使用DropResult它使並行化想要一個List,因爲我得到的錯誤「JavaSparkContext類型中的方法並行化(列表)不適用於參數(int [])」...函數給出「類型new Function < Integer,DropResult>(){}必須實現繼承的抽象方法Function .apply(Integer)「...在Eclipse中做了清理/保存。既然這可以回讀給主人,我想知道一個累加器會更好嗎? – JimLohse

+1

是的,我不知道makeRange()應該來自哪裏。在這種情況下,我會嘗試使用原始方法makeRangeList()方法,因爲我沒有看到Accumulator將如何提供幫助:JavaRDD simCountRDD = spark.parallelize(makeRangeList(1,getSimCount()))。map(new函數() Public DropResult call(Integer i){ return pld。doDrop(); } }); – burgersmoke