0
我是新來的scala以及FOR spark,請幫我解決這個問題。 在spark殼中當我單獨加載下面的函數時它們運行時沒有任何異常,當我在scala對象中拷貝這個函數,並且在spark殼中加載相同的文件時它們會拋出任務沒有序列化試圖在「processbatch」並行。 爲同一PFB代碼:org.apache.spark.SparkException:任務不可序列化(scala)
import org.apache.spark.sql.Row
import org.apache.log4j.Logger
import org.apache.spark.sql.hive.HiveContext
object Process {
val hc = new HiveContext(sc)
def processsingle(wait: Int, patient: org.apache.spark.sql.Row, visits: Array[org.apache.spark.sql.Row]) : String = {
var out = new StringBuilder()
val processStart = getTimeInMillis()
for(x <- visits) {
out.append(", " + x.getAs("patientid") + ":" + x.getAs("visitid"))
}
}
def processbatch(batch: Int, wait: Int, patients: Array[org.apache.spark.sql.Row], visits: Array[org.apache.spark.sql.Row]) = {
val out = sc.parallelize(patients, batch).map(r=> processsingle(wait, r, visits.filter(f=> f.getAs("patientid") == r.getAs("patientid")))).collect()
for(x <- out) println(x)
}
def processmeasures(fetch: Int, batch: Int, wait: Int) = {
val patients = hc.sql("SELECT patientid FROM tableName1 order by p_id").collect()
val visit = hc.sql("SELECT patientid, visitid FROM tableName2")
val count = patients.length
val fetches = if(count % fetch > 0) (count/fetch + 1) else (count/fetch)
for(i <- 0 to fetches.toInt-1){
val startFetch = i*fetch
val endFetch = math.min((i+1)*fetch, count.toInt)-1
val fetchSize = endFetch - startFetch + 1
val fetchClause = "patientid >= " + patients(startFetch).get(0) + " and patientid <= " + patients(endFetch).get(0)
val fetchVisit = visit.filter(fetchClause).collect()
val batches = if(fetchSize % batch > 0) (fetchSize/batch + 1) else (fetchSize/batch)
for(j <- 0 to batches.toInt-1){
val startBatch = j*batch
val endBatch = math.min((j+1)*batch, fetch.toInt)-1
println(s"Batch from $startBatch to $endBatch");
val batchVisits = fetchVisit.filter(g => g.getAs[Long]("patientid") >= patients(i*fetch + startBatch).getLong(0) && g.getAs[Long]("patientid") <= patients(math.min(i*fetch + endBatch + 1, endFetch)).getLong(0))
processbatch(batch, wait, patients.slice(i*fetch + startBatch, i*fetch + endBatch + 1), batchVisits)
}
}
println("Processing took " + getExecutionTime(processStart) + " millis")
}
}
謝謝,它解決了問題,但它是明智的擴展序列化的每一個地方 – Kalpesh
如果你想傳遞的東西行動/轉化,然後包裝類必須是可序列化的。真相被告知你的代碼有多個問題,序列化是最不重要的。 – zero323
感謝您的快速響應,因爲我寫這些東西對我來說是非常新的,請您提一下這些問題,以便我可以糾正。 – Kalpesh