我正在Scala中開發一個Spark應用程序,並想知道將其並行化並在hadoop集羣上運行的最佳方法。我的代碼會從hdfs文件中讀取每一行,解析它並生成多個記錄(對於每行),我將其作爲一個case類存儲。我已經在getElem()方法中編寫了完整的邏輯,並按預期工作。在斯卡拉設計並行化Spark應用程序的最佳方法
現在,我想計算所有輸入記錄的邏輯並將響應存儲到hdfs位置。
請讓我知道我該如何處理spark和整合爲輸入生成的所有相應輸出記錄並寫入HDFS。
object testing extends Serializable {
var recordArray=Array[Record]();
def main(args:Array[String])
{
val conf = new SparkConf().setAppName("jsonParsing").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext= new SQLContext(sc)
val input=sc.textFile("hdfs://loc/data.txt")
// input.collect().foreach(println)
input.map(data=>getElem(parse(data,false),sc,sqlContext))
}
//method definition
def getElem(json:JValue)={
// Parses the json and creates array of datasets for each input record and stores the data in case class
val x= Record("xxxx","xxxx","xxxx","xxxx","xxxx","xxxx","xxxx","xxxx","xxxx","xxxx")
}
case class Record(summary_key: String, key: String,array_name_position:Int,Parent_Level_1:String,Parent_level_2:String,Parent_Level_3:String,Parent_level_4:String,Parent_level_5:String,
param_name_position:Integer,Array_name:String,paramname:String,paramvalue:String)
}
你測試了你的代碼嗎?我懷疑它是一個完整的工作代碼。 –
@RameshMaharjan是的,邏輯工作在斯卡拉 –
具體是什麼問題?既然你似乎已經想出瞭如何正確解析記錄(你聲稱getElem正在工作,你喜歡),那麼你唯一的問題是保存結果?您想以什麼格式保存數據? – puhlen