2016-12-13 18 views
0

我想用spark創建一些時間序列模型。第一步是將序列數據重新格式化爲訓練樣本。我們的想法是:使用Spark重新格式化/轉換時間序列數據的有效方法

原始順序數據(各t *是一個數字)

t1 t2 t3 t4 t5 t6 t7 t8 t9 t10 

期望的輸出

t1 t2 t3 t4 t5 t6 
t2 t3 t4 t5 t6 t7 
t3 t4 t5 t6 t7 t8 
.................. 

如何寫一個函數在火花做到這一點。 函數簽名應該像

格式化(陣列[整數],n:整數)

返回類型是數據幀或向量

==========我嘗試的代碼在星火1.6.1 =========

val arraydata=Array[Double](1,2,3,4,5,6,7,8,9,10) 
val slideddata = arraydata.sliding(4).toSeq 
val rows = arraydata.sliding(4).map{x=>Row(x:_*)} 
sc.parallelize(arraydata.sliding(4).toSeq).toDF("Values") 

最後行不能以錯誤經歷:

Error:(52, 48) value toDF is not a member of org.apache.spark.rdd.RDD[Array[Double]] 
    sc.parallelize(arraydata.sliding(4).toSeq).toDF("Values") 

回答

1

我無法弄清n的意義,因爲它可以用作窗口大小以及它必須移動的值。

因此有兩個口味:

如果n是窗尺寸:

def reformat(arrayOfInteger:Array[Int], shiftValue: Int) ={ 
sc.parallelize(arrayOfInteger.sliding(shiftValue).toSeq).toDF("values") 
} 

On REPL:

scala> def reformat(arrayOfInteger:Array[Int], shiftValue: Int) ={ 
    | sc.parallelize(arrayOfInteger.sliding(shiftValue).toSeq).toDF("values") 
    | } 
reformat: (arrayOfInteger: Array[Int], shiftValue: Int)org.apache.spark.sql.DataFrame 

scala> val arrayofInteger=(1 to 10).toArray 
arrayofInteger: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) 

scala> reformat(arrayofInteger,3).show 
+----------+ 
| values| 
+----------+ 
| [1, 2, 3]| 
| [2, 3, 4]| 
| [3, 4, 5]| 
| [4, 5, 6]| 
| [5, 6, 7]| 
| [6, 7, 8]| 
| [7, 8, 9]| 
|[8, 9, 10]| 
+----------+ 

如果n的值被移位:

def reformat(arrayOfInteger:Array[Int], shiftValue: Int) ={ 
val slidingValue=arrayOfInteger.size-shiftValue 
sc.parallelize(arrayOfInteger.sliding(slidingValue).toSeq).toDF("values") 
} 

On REPL:

scala> def reformat(arrayOfInteger:Array[Int], shiftValue: Int) ={ 
    | val slidingValue=arrayOfInteger.size-shiftValue 
    | sc.parallelize(arrayOfInteger.sliding(slidingValue).toSeq).toDF("values") 
    | } 
reformat: (arrayOfInteger: Array[Int], shiftValue: Int)org.apache.spark.sql.DataFrame 

scala> val arrayofInteger=(1 to 10).toArray 
arrayofInteger: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) 

scala> reformat(arrayofInteger,3).show(false) 
+----------------------+ 
|values    | 
+----------------------+ 
|[1, 2, 3, 4, 5, 6, 7] | 
|[2, 3, 4, 5, 6, 7, 8] | 
|[3, 4, 5, 6, 7, 8, 9] | 
|[4, 5, 6, 7, 8, 9, 10]| 
+----------------------+ 
+0

我爲什麼不能做 「toDF」?我只能創建RDD – lserlohn

+0

也許我使用的是Spark 1.6.1,而不是2.0 – lserlohn

+0

對於使用toDF,您必須創建sqlContext,然後導入sqlContext.implicits._,然後只能使用toDF –

相關問題