2016-04-25 47 views
0

我有型Seq[Seq[(Double, Double)]]的變量:如何使用期望RDD [(Double,Double)]作爲Seq [Seq [(Double,Double)]]的輸入的函數?

val output: Seq[Seq[(Double, Double)]] = runStreams(ssc, numBatches, numBatches) 

現在我想申請功能RegressionMetrics這需要RDD[(Double, Double)]作爲輸入:

val metrics = new RegressionMetrics(output) 

如何變換Seq[Seq[(Double, Double)]]到RDD [(雙人間, Double)]`爲了能夠使用類RegressionMetrics的函數?

回答

1

RDD就是Apache斯巴克的一個分佈式彈性數據集

要創建一個RDD你需要的SparkContext一個實例,它可以被看作是一個「連接」或「處理」到抽象運行Apache Spark的集羣

假設

  • 你有一個實例SparkContext
  • 你要正確對待你的輸入作爲(Double, Double)值的「扁平化」的序列,忽略的方式,這些都是目前「拆分」成在Seq[Seq[(Double, Double)]]中的子序列

您可以創建一個RDD,如下所示:

val sc: SparkContext = ??? 
val output: Seq[Seq[(Double, Double)]] = ??? 

val rdd: RDD[(Double, Double)] = sc.parallelize(output.flatten) 
+0

好的,謝謝。我正在使用Seq進行測試。那麼,我是否理解正確,如果使用磁盤上存儲的一些數據,我可以將它讀入RDD並替換Seq? – Klue

+0

正確!對於測試 - 首先,您可以使用Spark的本地模式輕鬆創建獨立的SparkContext;其次,實際上你可以按照這裏所建議的使用'parallelize',或者直接通過'SparkContext.textFile'從文件中加載數據 –

相關問題