2016-11-18 139 views
2

我有一個需求來處理流入S3文件夾的xml文件。目前,我已經實施如下。Spark Streaming xml文件

首先,利用星火的FILESTREAM

val data = ssc.fileStream[LongWritable, Text, TextInputFormat]("s3://myfolder/",(t: org.apache.hadoop.fs.Path) => true, newFilesOnly = true, hadoopConf).map(_._2.toString())

對於每個RDD,檢查是否有文件被讀

if (data.count() !=0) 

寫入字符串到一個新的HDFS目錄

閱讀文件

從上面的HDFS di創建數據幀讀取教區長

val loaddata = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "Trans").load(sdir) 

請在數據幀進行一些處理,並保存爲JSON

loaddata.write.mode("append").json("s3://mybucket/somefolder") 

不知怎的,我覺得上面的方法是非常低效的,坦率地挺孩子氣的學校。 有更好的解決方案嗎?任何幫助將不勝感激。

後續問題: 如何操縱數據框中的字段(不是列)? 我有一個複雜的嵌套xml,當我使用上面描述的方法時,我得到了一個有9列和50個奇數內部Struct數組的數據框。除了需要修剪某些字段名稱外,這很好。有沒有辦法在不爆炸數據框的情況下實現這一點,因爲我需要再次構建相同的結構?

回答

1

如果您使用的Spark 2.0你可以使其與結構化工作流:

val inputDF = spark.readStream.format("com.databricks.spark.xml") 
    .option("rowTag", "Trans") 
    .load(path) 
+0

非常感謝。我的目標環境是使用Spark 2.0.1的EMR堆棧。我會在EMR盒上嘗試你的建議。 – Vamsi

+0

如果你對上面提到的解決方案沒問題,請投票/接受。 –