我想寫使用Scala的一個HDFS輸出文件,我收到下面的錯誤:寫HDFS OUTPUTFILE使用Scala
例外在線程「主要」 org.apache.spark.SparkException:任務不可串行化的 at org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:315) at org.apache.spark.util.ClosureCleaner $ .org $ apache $ spark $ util $ ClosureCleaner $$ clean(ClosureCleaner。 scala:305) at org.apache.spark.util.ClosureCleaner $ .clean(ClosureCleaner.scala:132) at org.apache.spark.SparkContext.clean(SparkContext.scala:1893) at org.apache.spark .rdd.RDD $$ anonfun $ foreach $ 1.apply(RDD.scala:869) at org.apache.spark.rdd.RDD $$ anonfun $ foreach $ 1.apply(RDD.scala:868) at org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:147) at org .apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD。的foreach(RDD.scala:868) 產生的原因:java.io.NotSerializableException:java.io.PrintWriter中 序列化堆棧:
所有行23我需要寫在輸出文件中的一行。
代碼來源:
package com.mycode.logs;
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.SparkContext._
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._
import org.apache.spark.sql.hive.HiveContext
import scala.io._
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.PrintWriter;
/**
* @author RondenaR
*
*/
object NormalizeMSLogs{
def main(args: Array[String]){
processMsLogs("/user/temporary/*file*")
}
def processMsLogs(path: String){
System.out.println("INFO: ****************** started ******************")
// **** SetMaster is Local only to test *****
// Set context
val sparkConf = new SparkConf().setAppName("tmp-logs").setMaster("local")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
val hiveContext = new HiveContext(sc)
// Set HDFS
System.setProperty("HADOOP_USER_NAME", "hdfs")
val hdfsconf = SparkHadoopUtil.get.newConfiguration(sc.getConf)
hdfsconf.set("fs.defaultFS", "hdfs://192.168.248.130:8020")
val hdfs = FileSystem.get(hdfsconf)
val output = hdfs.create(new Path("hdfs://192.168.248.130:8020/tmp/mySample.txt"))
val writer = new PrintWriter(output)
val sourcePath = new Path(path)
var count :Int = 0
var lineF :String = ""
hdfs.globStatus(sourcePath).foreach{ fileStatus =>
val filePathName = fileStatus.getPath().toString()
val fileName = fileStatus.getPath().getName()
val hdfsfileIn = sc.textFile(filePathName)
val msNode = fileName.substring(1, fileName.indexOf("es"))
System.out.println("filePathName: " + filePathName)
System.out.println("fileName: " + fileName)
System.out.println("hdfsfileIn: " + filePathName)
System.out.println("msNode: " + msNode)
for(line <- hdfsfileIn){
//System.out.println("line = " + line)
count += 1
if(count != 23){
lineF = lineF + line + ", "
}
if(count == 23){
lineF = lineF + line + ", " + msNode
System.out.println(lineF)
writer.write(lineF)
writer.write("\n")
count = 0
lineF = ""
}
} // end for loop in file
} // end foreach loop
writer.close()
System.out.println("INFO: ******************ended ******************")
sc.stop()
}
}
您正嘗試在分佈式塊中使用'writer',看起來對我很可疑。我會嘗試'map'而不是'foreach',然後你有RDD作爲你可以迭代和讀/寫的結果。無論如何,你可能需要洗牌階段,IMO無法避免它,HDFS有自己的想法如何分發文件。 –
在對文件進行規範化後,我可以將它輸出到列表中,並在完成列表後將它放入HIVE表格中? –