我有我想用Spark代碼解析的XML。我有兩個辦法:用Spark解析XML - 面對不同方法的問題需要建議?
- 使用com.databricks.spark.xml.XmlReader
- 使用HiveContext - 使DF和遍歷XML
任何其他方式,如果這些請分享。我會在下面分享我的回答
我有我想用Spark代碼解析的XML。我有兩個辦法:用Spark解析XML - 面對不同方法的問題需要建議?
任何其他方式,如果這些請分享。我會在下面分享我的回答
下面是我使用的示例代碼,但這裏的問題是如果在架構中有任何更改,它將始終需要更改代碼。
import Util.XmlFileUtil
import org.apache.spark.{SparkConf,SparkContext}
import org.apache.spark.sql.hive.HiveContext
object xPathReader extends App{
System.setProperty("hadoop.home.dir","C:\\hadoop\\winutils")
val sparkConf = new SparkConf().setAppName("MstarXmlIngestion").setMaster("local[5]")
val sc = new SparkContext(sparkConf)
val hiveContext = new HiveContext(sc)
val fscXmlPath = "C:\\data\\xml"
val xmlRddList = XmlFileUtil.withCharset(sc, fscXmlPath, "UTF-8", "book")
import hiveContext.implicits._
val xmlDf = xmlRddList.toDF("xml")
xmlDf.registerTempTable("TEMPTABLE")
hiveContext.sql("select xpath_string(xml,\"/book/@_id\") as BookId, xpath_string(xml,\"/book/description\") as description, CurrID, Price from TEMPTABLE " +
"LATERAL VIEW OUTER explode(xpath(xml,\"/book/market/price/text()\")) PRICE_LIST as Price " +
"LATERAL VIEW OUTER explode(xpath(xml,\"/book/currency/price/text()\")) CurrID_LIST as CurrID").show()
}
Another XmlFileUtil Class
===================================================
import com.databricks.spark.xml.util.XmlFile;
import org.apache.spark.SparkContext;
import org.apache.spark.rdd.RDD;
/**
* Utility class to access private `XmlFile` scala object from spark xml package
*/
public class XmlFileUtil {
public static RDD<String> withCharset(SparkContext context, String location, String charset, String rowTag) {
return XmlFile.withCharset(context, location, charset, rowTag);
}
}
可悲的是,我們不能使用所有的XML文件的通用解決方案。 如果你的XML文件改變了,代碼也會如此。
在我的代碼我使用了XML源SparkSQL通過databricks 你可以找到它here
如這裏說的是另一種方法:
object XmlSecondApproach extends App{
System.setProperty("hadoop.home.dir","C:\\hadoop\\winutils")
val sparkConf = new SparkConf().setAppName("Second Approach").setMaster("local[5]")
val sc = new SparkContext(sparkConf)
val hiveContext = new HiveContext(sc)
val rawXmlPath = "C:\\data\\xml\\RawXML"
val objXMLDR = new XmlDataReader
var rawDf = objXMLDR.getRawXmlDataframe(sc, rawXmlPath, "book")
rawDf.registerTempTable("tempBookTable")
rawDf.printSchema()
hiveContext.udf.register("customFunction",UDFWrappedArrayConverter.checkIfWrappedArray)
//When we get more than one arrays in a parent node then we need to handle parent repeated node with explode
hiveContext.sql("SELECT book.currency.price as curPrice, " +
"markET.price as mktPrice from tempBookTable " +
"LATERAL VIEW OUTER explode(book.market) Mrkt_List as markET").show()
}
//Define another scala class
class XmlDataReader {
def getRawXmlDataframe(sc:SparkContext, xmlPath: String, rowTag: String): DataFrame = {
val xmlRddList = XmlFileUtil.withCharset(sc, xmlPath, "UTF-8", rowTag)
val xmlReader = new XmlReader()
val xmlDf = xmlReader.withAttributePrefix(Constant.ATTR_PREFIX).withValueTag(Constant.VALUE_TAG).withRowTag(rowTag.toLowerCase).xmlRdd(hiveContext, xmlRddList)
return xmlDf
}
}
接受,我們必須改變代碼爲每架構更改。但是如果我們使用DF,那麼它會起作用。當然,我們必須做一些曲折。我會分享我的另一種方法 –