2016-05-13 96 views
0

我想寫使用Scala的一個HDFS輸出文件,我收到下面的錯誤:寫HDFS OUTPUTFILE使用Scala

例外在線程「主要」 org.apache.spark.SparkException:任務不可串行化的 at org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:315) at org.apache.spark.util.ClosureCleaner $ .org $ apache $ spark $ util $ ClosureCleaner $$ clean(ClosureCleaner。 scala:305) at org.apache.spark.util.ClosureCleaner $ .clean(ClosureCleaner.scala:132) at org.apache.spark.SparkContext.clean(SparkContext.scala:1893) at org.apache.spark .rdd.RDD $$ anonfun $ foreach $ 1.apply(RDD.scala:869) at org.apache.spark.rdd.RDD $$ anonfun $ foreach $ 1.apply(RDD.scala:868) at org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:147) at org .apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD。的foreach(RDD.scala:868) 產生的原因:java.io.NotSerializableException:java.io.PrintWriter中 序列化堆棧:

所有行23我需要寫在輸出文件中的一行。

代碼來源:

package com.mycode.logs; 

import org.apache.hadoop.conf.Configuration 
import org.apache.hadoop.fs._ 
import org.apache.spark.SparkContext._ 
import org.apache.spark._ 
import org.apache.spark.deploy.SparkHadoopUtil 
import org.apache.spark.sql._ 
import org.apache.spark.sql.hive.HiveContext 
import scala.io._ 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import java.io.PrintWriter; 

/** 
* @author RondenaR 
* 
*/ 
object NormalizeMSLogs{ 

    def main(args: Array[String]){ 
    processMsLogs("/user/temporary/*file*") 
    } 

    def processMsLogs(path: String){ 
    System.out.println("INFO: ****************** started ******************") 

    // **** SetMaster is Local only to test ***** 
    // Set context 
    val sparkConf = new SparkConf().setAppName("tmp-logs").setMaster("local") 
    val sc = new SparkContext(sparkConf) 
    val sqlContext = new SQLContext(sc) 
    val hiveContext = new HiveContext(sc) 

    // Set HDFS 
    System.setProperty("HADOOP_USER_NAME", "hdfs") 
    val hdfsconf = SparkHadoopUtil.get.newConfiguration(sc.getConf) 
    hdfsconf.set("fs.defaultFS", "hdfs://192.168.248.130:8020") 
    val hdfs = FileSystem.get(hdfsconf) 

    val output = hdfs.create(new Path("hdfs://192.168.248.130:8020/tmp/mySample.txt")) 
    val writer = new PrintWriter(output) 

    val sourcePath = new Path(path) 
    var count :Int = 0 
    var lineF :String = "" 

    hdfs.globStatus(sourcePath).foreach{ fileStatus => 
     val filePathName = fileStatus.getPath().toString() 
     val fileName = fileStatus.getPath().getName() 

     val hdfsfileIn = sc.textFile(filePathName) 
     val msNode = fileName.substring(1, fileName.indexOf("es")) 

     System.out.println("filePathName: " + filePathName) 
     System.out.println("fileName: " + fileName) 
     System.out.println("hdfsfileIn: " + filePathName) 
     System.out.println("msNode: " + msNode) 

     for(line <- hdfsfileIn){ 
     //System.out.println("line = " + line) 
     count += 1 

     if(count != 23){ 
      lineF = lineF + line + ", " 
     } 

     if(count == 23){ 
      lineF = lineF + line + ", " + msNode 
      System.out.println(lineF) 
      writer.write(lineF) 
      writer.write("\n") 
      count = 0 
      lineF = "" 
     } 
     } // end for loop in file 
    } // end foreach loop 
    writer.close() 
    System.out.println("INFO: ******************ended ******************") 
    sc.stop() 
    } 
} 
+0

您正嘗試在分佈式塊中使用'writer',看起來對我很可疑。我會嘗試'map'而不是'foreach',然後你有RDD作爲你可以迭代和讀/寫的結果。無論如何,你可能需要洗牌階段,IMO無法避免它,HDFS有自己的想法如何分發文件。 –

+0

在對文件進行規範化後,我可以將它輸出到列表中,並在完成列表後將它放入HIVE表格中? –

回答

0

不僅是PrintWriter對象writer不能序列:您也可以不把SparkContextsc)在foreach內:它是一個結構只爲駕駛者和不通過電線傳送給工人是有道理的。

您應該花些時間考慮通過電線發送什麼類型的對象。任何指針/流/句柄都沒有意義。結構,字符串,原語:這些是有意義的包含在Closure(或廣播)中。