2017-06-10 60 views
0

我有一個輸入文件test-reading.csv星火數據集 - 讀CSV並寫入空輸出

id,sku,price 
"100002701--425370728",100002701,12159 
"100002701--510892030",100002701,11021 
"100002701-235195215",100002701,12330 
"100002701-110442364",100002701,9901 
"100002701-1963746094",100002701,11243 

我纔能有我所面臨的問題的一個最小的,完整的,和可覈查的例子寫了下面的源代碼。

有一個用於讀取CSV文件的ReadingRecord類和用於寫入輸出的WritingRecord。順便說一句,現在它們幾乎完全相同,但在真正的程序中卻有很大不同,因爲它們代表了輸入和輸出結構。

其餘代碼開始Spark,讀取CSV,將ReadingRecord映射到WritingRecord並寫入輸出CSV。

現在的問題是:爲什麼如果我將for循環取消註釋爲flatMapGroups方法,則此Spark程序將停止寫入CSV輸出?

case class ReadingRecord(var id: String, var sku: Integer, var price: Integer) { 
    def toWritingRecord(): WritingRecord = { 
    new WritingRecord(this.id, this.sku, this.price) 
    } 
} 

case class WritingRecord(var id: String, var sku: Integer, var price: Integer) 

object ReadingRecordEncoders { 
    implicit def ReadingRecordEncoder: org.apache.spark.sql.Encoder[ReadingRecord] = 
    org.apache.spark.sql.Encoders.kryo[ReadingRecord] 
} 

object WritingTest { 

    def main(args: Array[String]) { 

    val conf = new SparkConf() 
     .setMaster("local[8]") 
     .setAppName("writing-test") 
     .set("spark.executor.memory", "1gb") 
     .set("spark.num.executors", "8") 
     .set("spark.executor.heartbeatInterval", "120") 

    val spark = SparkSession.builder().config(conf).getOrCreate() 

    import spark.implicits._ 
    import ReadingRecordEncoders._ 

    val data = spark.read.option("header", "true") 
     .option("delimiter", ",") 
     .option("inferSchema", "true") 
     .csv("test-reading.csv") 
     .map(r => { 
     println(r) 
     new ReadingRecord(r(0).asInstanceOf[String], r(1).asInstanceOf[Integer], r(2).asInstanceOf[Integer]) 
     }).groupByKey(r1 => r1.sku) 

    val data1 = data.flatMapGroups((a: Integer, b: Iterator[ReadingRecord]) => { 
     var list = new ArrayList[ReadingRecord] 
     try { 
     //  for (o <- b) { 
     //   list.add(o) 
     //  } 
     } finally { 
     list.clear() 
     list = null 
     } 

     b.map(f => f.toWritingRecord) 
    }) 

    data1.printSchema() 

    data1.write 
     .format("csv") 
     .option("header", "true") 
     .save("output.csv") 
    } 
} 

回答

1

隨着註釋掉的代碼包括在內,你想重用Iteratorb。在使用時的Iterator修改:

這是特別重要的要注意的是,除非另有說明,一個調用它的方法後,不應該使用迭代器。

請參閱the Iterator documentation