2016-06-28 60 views
1

我一直在試圖從一個由雙列組成的1列Spark Dataframe中提取信息並將其放入Breeze SparseVector中。爲此,我遍歷我的1列DataFrame的每個元素,強制它成爲Double,然後將其添加到VectorBuilder。我的VectorBuilder在foreach循環中正確地改變其狀態,然後在循環結束後清除所有更改。爲什麼會發生?有沒有解決方法?建立一個SparseVector出DataFrame中的值

編輯1:

我在本地運行1核心;它不是一個集羣

代碼:

val accum = sc.accumulator(0, "Counter") 

def correlate() : Unit = { 

    val cols = df.columns 
    val id = cols(0)  
    val id2 = cols(1) 

    //id1 and id2 are there for 
    val df1 = sqlContext.sql(s"SELECT ${id} FROM dataset WHERE (${id} IS NOT NULL AND ${id2} IS NOT NULL)") 

    /* df1 is a dataframe that has 1 column*/ 
    df1.show(); 
    accum.value_=(0); 

    /******************** Problem starts here **********************/ 
    val builder = new VectorBuilder[Double](5) 
    df1.foreach{ x => 
    x(0) match{    
     case d : Double => 
     builder.add(accum.value, d); 
     //This print statement prints out correct values 
     println(s"index: ${accum.value} value: ${builder(accum.value)}")  
     accum.value += 1; 
     println(s"builder's active size in loop: ${builder.activeSize}") 
     case _ => throw new ClassCastException("Pattern-Matching for Double failed"); 
    } 
    } 
    //temp becomes empty at this point 
    println(s"builder's active size out of loop: ${builder.activeSize}") 

    val sparse = builder.toSparseVector  
    sparse.foreachPair{(i,v) => println(s"index: ${i} and value: ${v}")} 
} 
this.correlate() 

輸出:

+-------+ 
| RowX| 
+-------+ 
| 145.0| 
| -1.0| 
|-212.21| 
| 23.3| 
| 21.4| 
+-------+ 

index: 0 value: 145.0 
builder's active size in loop: 1 
index: 1 value: -1.0 
builder's active size in loop: 2 
index: 2 value: -212.21 
builder's active size in loop: 3 
index: 3 value: 23.3 
builder's active size in loop: 4 
index: 4 value: 21.4 
builder's active size in loop: 5 

//the loop ends here and builder's state disappears 

builder's active size out of loop: 0 
index: 0 and value: 0.0 
index: 1 and value: 0.0 
index: 2 and value: 0.0 
index: 3 and value: 0.0 
index: 4 and value: 0.0 

回答

1

它增加了製造商的本地副本,每一個工人。獲取本地對象收集:

SparseVector(df1.rdd.map(_.getDouble(0)).collect) 
相關問題