2015-12-30 33 views
0

我是新來的scala以及FOR spark,請幫我解決這個問題。 在spark殼中當我單獨加載下面的函數時它們運行時沒有任何異常,當我在scala對象中拷貝這個函數,並且在spark殼中加載相同的文件時它們會拋出任務沒有序列化試圖在「processbatch」並行。 爲同一PFB代碼:org.apache.spark.SparkException:任務不可序列化(scala)

import org.apache.spark.sql.Row 
import org.apache.log4j.Logger 
import org.apache.spark.sql.hive.HiveContext 

object Process { 
    val hc = new HiveContext(sc) 

    def processsingle(wait: Int, patient: org.apache.spark.sql.Row, visits: Array[org.apache.spark.sql.Row]) : String = { 
     var out = new StringBuilder() 
     val processStart = getTimeInMillis() 
     for(x <- visits) { 
      out.append(", " + x.getAs("patientid") + ":" + x.getAs("visitid")) 
     } 
    } 

    def processbatch(batch: Int, wait: Int, patients: Array[org.apache.spark.sql.Row], visits: Array[org.apache.spark.sql.Row]) = { 
     val out = sc.parallelize(patients, batch).map(r=> processsingle(wait, r, visits.filter(f=> f.getAs("patientid") == r.getAs("patientid")))).collect() 
     for(x <- out) println(x) 
    } 

    def processmeasures(fetch: Int, batch: Int, wait: Int) = { 

     val patients = hc.sql("SELECT patientid FROM tableName1 order by p_id").collect() 
     val visit = hc.sql("SELECT patientid, visitid FROM tableName2") 
     val count = patients.length 
     val fetches = if(count % fetch > 0) (count/fetch + 1) else (count/fetch) 


     for(i <- 0 to fetches.toInt-1){ 
      val startFetch = i*fetch 
      val endFetch = math.min((i+1)*fetch, count.toInt)-1 
      val fetchSize = endFetch - startFetch + 1 
      val fetchClause = "patientid >= " + patients(startFetch).get(0) + " and patientid <= " + patients(endFetch).get(0) 
      val fetchVisit = visit.filter(fetchClause).collect() 

      val batches = if(fetchSize % batch > 0) (fetchSize/batch + 1) else (fetchSize/batch) 
      for(j <- 0 to batches.toInt-1){ 
       val startBatch = j*batch 
       val endBatch = math.min((j+1)*batch, fetch.toInt)-1 

       println(s"Batch from $startBatch to $endBatch"); 
       val batchVisits = fetchVisit.filter(g => g.getAs[Long]("patientid") >= patients(i*fetch + startBatch).getLong(0) && g.getAs[Long]("patientid") <= patients(math.min(i*fetch + endBatch + 1, endFetch)).getLong(0)) 
       processbatch(batch, wait, patients.slice(i*fetch + startBatch, i*fetch + endBatch + 1), batchVisits) 
      } 
     } 
     println("Processing took " + getExecutionTime(processStart) + " millis") 
    } 

} 
+0

謝謝,它解決了問題,但它是明智的擴展序列化的每一個地方 – Kalpesh

+0

如果你想傳遞的東西行動/轉化,然後包裝類必須是可序列化的。真相被告知你的代碼有多個問題,序列化是最不重要的。 – zero323

+0

感謝您的快速響應,因爲我寫這些東西對我來說是非常新的,請您提一下這些問題,以便我可以糾正。 – Kalpesh

回答

1

你應該讓Process對象Serializable

object Process extends Serializable { 
    ... 
} 
相關問題