1
我一直在試圖從一個由雙列組成的1列Spark Dataframe中提取信息並將其放入Breeze SparseVector中。爲此,我遍歷我的1列DataFrame的每個元素,強制它成爲Double,然後將其添加到VectorBuilder。我的VectorBuilder在foreach循環中正確地改變其狀態,然後在循環結束後清除所有更改。爲什麼會發生?有沒有解決方法?建立一個SparseVector出DataFrame中的值
編輯1:
我在本地運行1核心;它不是一個集羣
代碼:
val accum = sc.accumulator(0, "Counter")
def correlate() : Unit = {
val cols = df.columns
val id = cols(0)
val id2 = cols(1)
//id1 and id2 are there for
val df1 = sqlContext.sql(s"SELECT ${id} FROM dataset WHERE (${id} IS NOT NULL AND ${id2} IS NOT NULL)")
/* df1 is a dataframe that has 1 column*/
df1.show();
accum.value_=(0);
/******************** Problem starts here **********************/
val builder = new VectorBuilder[Double](5)
df1.foreach{ x =>
x(0) match{
case d : Double =>
builder.add(accum.value, d);
//This print statement prints out correct values
println(s"index: ${accum.value} value: ${builder(accum.value)}")
accum.value += 1;
println(s"builder's active size in loop: ${builder.activeSize}")
case _ => throw new ClassCastException("Pattern-Matching for Double failed");
}
}
//temp becomes empty at this point
println(s"builder's active size out of loop: ${builder.activeSize}")
val sparse = builder.toSparseVector
sparse.foreachPair{(i,v) => println(s"index: ${i} and value: ${v}")}
}
this.correlate()
輸出:
+-------+
| RowX|
+-------+
| 145.0|
| -1.0|
|-212.21|
| 23.3|
| 21.4|
+-------+
index: 0 value: 145.0
builder's active size in loop: 1
index: 1 value: -1.0
builder's active size in loop: 2
index: 2 value: -212.21
builder's active size in loop: 3
index: 3 value: 23.3
builder's active size in loop: 4
index: 4 value: 21.4
builder's active size in loop: 5
//the loop ends here and builder's state disappears
builder's active size out of loop: 0
index: 0 and value: 0.0
index: 1 and value: 0.0
index: 2 and value: 0.0
index: 3 and value: 0.0
index: 4 and value: 0.0